this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") 18 && let Ok(value) = forwarded.to_str() 19 && let Some(first_ip) = value.split(',').next() { 20 return first_ip.trim().to_string(); 21 } 22 if let Some(real_ip) = headers.get("x-real-ip") 23 && let Ok(value) = real_ip.to_str() { 24 return value.trim().to_string(); 25 } 26 "unknown".to_string() 27} 28 29fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 30 let suffix = format!(".{}", pds_hostname); 31 if identifier.ends_with(&suffix) { 32 identifier[..identifier.len() - suffix.len()].to_string() 33 } else { 34 identifier.to_string() 35 } 36} 37 38fn full_handle(stored_handle: &str, pds_hostname: &str) -> String { 39 if stored_handle.contains('.') { 40 stored_handle.to_string() 41 } else { 42 format!("{}.{}", stored_handle, pds_hostname) 43 } 44} 45 46#[derive(Deserialize)] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50} 51 52#[derive(Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct CreateSessionOutput { 55 pub access_jwt: String, 56 pub refresh_jwt: String, 57 pub handle: String, 58 pub did: String, 59} 60 61pub async fn create_session( 62 State(state): State<AppState>, 63 headers: HeaderMap, 64 Json(input): Json<CreateSessionInput>, 65) -> Response { 66 info!("create_session called"); 67 let client_ip = extract_client_ip(&headers); 68 if !state 69 .check_rate_limit(RateLimitKind::Login, &client_ip) 70 .await 71 { 72 warn!(ip = %client_ip, "Login rate limit exceeded"); 73 return ( 74 StatusCode::TOO_MANY_REQUESTS, 75 Json(json!({ 76 "error": "RateLimitExceeded", 77 "message": "Too many login attempts. Please try again later." 78 })), 79 ) 80 .into_response(); 81 } 82 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 83 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 84 let row = match sqlx::query!( 85 r#"SELECT 86 u.id, u.did, u.handle, u.password_hash, 87 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 88 k.key_bytes, k.encryption_version 89 FROM users u 90 JOIN user_keys k ON u.id = k.user_id 91 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 92 normalized_identifier 93 ) 94 .fetch_optional(&state.db) 95 .await 96 { 97 Ok(Some(row)) => row, 98 Ok(None) => { 99 let _ = verify( 100 &input.password, 101 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 102 ); 103 warn!("User not found for login attempt"); 104 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 105 .into_response(); 106 } 107 Err(e) => { 108 error!("Database error fetching user: {:?}", e); 109 return ApiError::InternalError.into_response(); 110 } 111 }; 112 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 113 Ok(k) => k, 114 Err(e) => { 115 error!("Failed to decrypt user key: {:?}", e); 116 return ApiError::InternalError.into_response(); 117 } 118 }; 119 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) { 120 true 121 } else { 122 let app_passwords = sqlx::query!( 123 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 124 row.id 125 ) 126 .fetch_all(&state.db) 127 .await 128 .unwrap_or_default(); 129 app_passwords 130 .iter() 131 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 132 }; 133 if !password_valid { 134 warn!("Password verification failed for login attempt"); 135 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 136 .into_response(); 137 } 138 let is_verified = 139 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 140 if !is_verified { 141 warn!("Login attempt for unverified account: {}", row.did); 142 return ( 143 StatusCode::FORBIDDEN, 144 Json(json!({ 145 "error": "AccountNotVerified", 146 "message": "Please verify your account before logging in", 147 "did": row.did 148 })), 149 ) 150 .into_response(); 151 } 152 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 153 Ok(m) => m, 154 Err(e) => { 155 error!("Failed to create access token: {:?}", e); 156 return ApiError::InternalError.into_response(); 157 } 158 }; 159 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 160 Ok(m) => m, 161 Err(e) => { 162 error!("Failed to create refresh token: {:?}", e); 163 return ApiError::InternalError.into_response(); 164 } 165 }; 166 if let Err(e) = sqlx::query!( 167 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 168 row.did, 169 access_meta.jti, 170 refresh_meta.jti, 171 access_meta.expires_at, 172 refresh_meta.expires_at 173 ) 174 .execute(&state.db) 175 .await 176 { 177 error!("Failed to insert session: {:?}", e); 178 return ApiError::InternalError.into_response(); 179 } 180 let handle = full_handle(&row.handle, &pds_hostname); 181 Json(CreateSessionOutput { 182 access_jwt: access_meta.token, 183 refresh_jwt: refresh_meta.token, 184 handle, 185 did: row.did, 186 }) 187 .into_response() 188} 189 190pub async fn get_session( 191 State(state): State<AppState>, 192 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 193) -> Response { 194 match sqlx::query!( 195 r#"SELECT 196 handle, email, email_verified, is_admin, deactivated_at, 197 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 198 discord_verified, telegram_verified, signal_verified 199 FROM users WHERE did = $1"#, 200 auth_user.did 201 ) 202 .fetch_optional(&state.db) 203 .await 204 { 205 Ok(Some(row)) => { 206 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 207 crate::comms::CommsChannel::Email => ("email", row.email_verified), 208 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 209 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 210 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 211 }; 212 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 213 let handle = full_handle(&row.handle, &pds_hostname); 214 let is_active = row.deactivated_at.is_none(); 215 Json(json!({ 216 "handle": handle, 217 "did": auth_user.did, 218 "email": row.email, 219 "emailVerified": row.email_verified, 220 "preferredChannel": preferred_channel, 221 "preferredChannelVerified": preferred_channel_verified, 222 "isAdmin": row.is_admin, 223 "active": is_active, 224 "status": if is_active { "active" } else { "deactivated" }, 225 "didDoc": {} 226 })).into_response() 227 } 228 Ok(None) => ApiError::AuthenticationFailed.into_response(), 229 Err(e) => { 230 error!("Database error in get_session: {:?}", e); 231 ApiError::InternalError.into_response() 232 } 233 } 234} 235 236pub async fn delete_session( 237 State(state): State<AppState>, 238 headers: axum::http::HeaderMap, 239) -> Response { 240 let token = match crate::auth::extract_bearer_token_from_header( 241 headers.get("Authorization").and_then(|h| h.to_str().ok()), 242 ) { 243 Some(t) => t, 244 None => return ApiError::AuthenticationRequired.into_response(), 245 }; 246 let jti = match crate::auth::get_jti_from_token(&token) { 247 Ok(jti) => jti, 248 Err(_) => return ApiError::AuthenticationFailed.into_response(), 249 }; 250 let did = crate::auth::get_did_from_token(&token).ok(); 251 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 252 .execute(&state.db) 253 .await 254 { 255 Ok(res) if res.rows_affected() > 0 => { 256 if let Some(did) = did { 257 let session_cache_key = format!("auth:session:{}:{}", did, jti); 258 let _ = state.cache.delete(&session_cache_key).await; 259 } 260 Json(json!({})).into_response() 261 } 262 Ok(_) => ApiError::AuthenticationFailed.into_response(), 263 Err(e) => { 264 error!("Database error in delete_session: {:?}", e); 265 ApiError::AuthenticationFailed.into_response() 266 } 267 } 268} 269 270pub async fn refresh_session( 271 State(state): State<AppState>, 272 headers: axum::http::HeaderMap, 273) -> Response { 274 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 275 if !state 276 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 277 .await 278 { 279 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 280 return ( 281 axum::http::StatusCode::TOO_MANY_REQUESTS, 282 axum::Json(serde_json::json!({ 283 "error": "RateLimitExceeded", 284 "message": "Too many requests. Please try again later." 285 })), 286 ) 287 .into_response(); 288 } 289 let refresh_token = match crate::auth::extract_bearer_token_from_header( 290 headers.get("Authorization").and_then(|h| h.to_str().ok()), 291 ) { 292 Some(t) => t, 293 None => return ApiError::AuthenticationRequired.into_response(), 294 }; 295 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 296 Ok(jti) => jti, 297 Err(_) => { 298 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 299 .into_response(); 300 } 301 }; 302 let mut tx = match state.db.begin().await { 303 Ok(tx) => tx, 304 Err(e) => { 305 error!("Failed to begin transaction: {:?}", e); 306 return ApiError::InternalError.into_response(); 307 } 308 }; 309 if let Ok(Some(session_id)) = sqlx::query_scalar!( 310 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 311 refresh_jti 312 ) 313 .fetch_optional(&mut *tx) 314 .await 315 { 316 warn!( 317 "Refresh token reuse detected! Revoking token family for session_id: {}", 318 session_id 319 ); 320 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 321 .execute(&mut *tx) 322 .await; 323 let _ = tx.commit().await; 324 return ApiError::ExpiredTokenMsg( 325 "Refresh token has been revoked due to suspected compromise".into(), 326 ) 327 .into_response(); 328 } 329 let session_row = match sqlx::query!( 330 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 331 FROM session_tokens st 332 JOIN users u ON st.did = u.did 333 JOIN user_keys k ON u.id = k.user_id 334 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 335 FOR UPDATE OF st"#, 336 refresh_jti 337 ) 338 .fetch_optional(&mut *tx) 339 .await 340 { 341 Ok(Some(row)) => row, 342 Ok(None) => { 343 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 344 .into_response(); 345 } 346 Err(e) => { 347 error!("Database error fetching session: {:?}", e); 348 return ApiError::InternalError.into_response(); 349 } 350 }; 351 let key_bytes = 352 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 353 Ok(k) => k, 354 Err(e) => { 355 error!("Failed to decrypt user key: {:?}", e); 356 return ApiError::InternalError.into_response(); 357 } 358 }; 359 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 360 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 361 } 362 let new_access_meta = 363 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 364 Ok(m) => m, 365 Err(e) => { 366 error!("Failed to create access token: {:?}", e); 367 return ApiError::InternalError.into_response(); 368 } 369 }; 370 let new_refresh_meta = 371 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 372 Ok(m) => m, 373 Err(e) => { 374 error!("Failed to create refresh token: {:?}", e); 375 return ApiError::InternalError.into_response(); 376 } 377 }; 378 match sqlx::query!( 379 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 380 refresh_jti, 381 session_row.id 382 ) 383 .execute(&mut *tx) 384 .await 385 { 386 Ok(result) if result.rows_affected() == 0 => { 387 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 388 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 389 .execute(&mut *tx) 390 .await; 391 let _ = tx.commit().await; 392 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 393 } 394 Err(e) => { 395 error!("Failed to record used refresh token: {:?}", e); 396 return ApiError::InternalError.into_response(); 397 } 398 Ok(_) => {} 399 } 400 if let Err(e) = sqlx::query!( 401 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 402 new_access_meta.jti, 403 new_refresh_meta.jti, 404 new_access_meta.expires_at, 405 new_refresh_meta.expires_at, 406 session_row.id 407 ) 408 .execute(&mut *tx) 409 .await 410 { 411 error!("Database error updating session: {:?}", e); 412 return ApiError::InternalError.into_response(); 413 } 414 if let Err(e) = tx.commit().await { 415 error!("Failed to commit transaction: {:?}", e); 416 return ApiError::InternalError.into_response(); 417 } 418 match sqlx::query!( 419 r#"SELECT 420 handle, email, email_verified, is_admin, 421 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 422 discord_verified, telegram_verified, signal_verified 423 FROM users WHERE did = $1"#, 424 session_row.did 425 ) 426 .fetch_optional(&state.db) 427 .await 428 { 429 Ok(Some(u)) => { 430 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 431 crate::comms::CommsChannel::Email => ("email", u.email_verified), 432 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 433 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 434 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 435 }; 436 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 437 let handle = full_handle(&u.handle, &pds_hostname); 438 Json(json!({ 439 "accessJwt": new_access_meta.token, 440 "refreshJwt": new_refresh_meta.token, 441 "handle": handle, 442 "did": session_row.did, 443 "email": u.email, 444 "emailVerified": u.email_verified, 445 "preferredChannel": preferred_channel, 446 "preferredChannelVerified": preferred_channel_verified, 447 "isAdmin": u.is_admin, 448 "active": true 449 })).into_response() 450 } 451 Ok(None) => { 452 error!("User not found for existing session: {}", session_row.did); 453 ApiError::InternalError.into_response() 454 } 455 Err(e) => { 456 error!("Database error fetching user: {:?}", e); 457 ApiError::InternalError.into_response() 458 } 459 } 460} 461 462#[derive(Deserialize)] 463#[serde(rename_all = "camelCase")] 464pub struct ConfirmSignupInput { 465 pub did: String, 466 pub verification_code: String, 467} 468 469#[derive(Serialize)] 470#[serde(rename_all = "camelCase")] 471pub struct ConfirmSignupOutput { 472 pub access_jwt: String, 473 pub refresh_jwt: String, 474 pub handle: String, 475 pub did: String, 476 pub email: Option<String>, 477 pub email_verified: bool, 478 pub preferred_channel: String, 479 pub preferred_channel_verified: bool, 480} 481 482pub async fn confirm_signup( 483 State(state): State<AppState>, 484 Json(input): Json<ConfirmSignupInput>, 485) -> Response { 486 info!("confirm_signup called for DID: {}", input.did); 487 let row = match sqlx::query!( 488 r#"SELECT 489 u.id, u.did, u.handle, u.email, 490 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 491 k.key_bytes, k.encryption_version 492 FROM users u 493 JOIN user_keys k ON u.id = k.user_id 494 WHERE u.did = $1"#, 495 input.did 496 ) 497 .fetch_optional(&state.db) 498 .await 499 { 500 Ok(Some(row)) => row, 501 Ok(None) => { 502 warn!("User not found for confirm_signup: {}", input.did); 503 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response(); 504 } 505 Err(e) => { 506 error!("Database error in confirm_signup: {:?}", e); 507 return ApiError::InternalError.into_response(); 508 } 509 }; 510 511 let verification = match sqlx::query!( 512 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = 'email'", 513 row.id 514 ) 515 .fetch_optional(&state.db) 516 .await 517 { 518 Ok(Some(v)) => v, 519 Ok(None) => { 520 warn!("No verification code found for user: {}", input.did); 521 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 522 } 523 Err(e) => { 524 error!("Database error fetching verification: {:?}", e); 525 return ApiError::InternalError.into_response(); 526 } 527 }; 528 529 if verification.code != input.verification_code { 530 warn!("Invalid verification code for user: {}", input.did); 531 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 532 } 533 if verification.expires_at < Utc::now() { 534 warn!("Verification code expired for user: {}", input.did); 535 return ApiError::ExpiredTokenMsg("Verification code has expired".into()) 536 .into_response(); 537 } 538 539 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 540 Ok(k) => k, 541 Err(e) => { 542 error!("Failed to decrypt user key: {:?}", e); 543 return ApiError::InternalError.into_response(); 544 } 545 }; 546 let verified_column = match row.channel { 547 crate::comms::CommsChannel::Email => "email_verified", 548 crate::comms::CommsChannel::Discord => "discord_verified", 549 crate::comms::CommsChannel::Telegram => "telegram_verified", 550 crate::comms::CommsChannel::Signal => "signal_verified", 551 }; 552 let update_query = format!( 553 "UPDATE users SET {} = TRUE WHERE did = $1", 554 verified_column 555 ); 556 if let Err(e) = sqlx::query(&update_query) 557 .bind(&input.did) 558 .execute(&state.db) 559 .await 560 { 561 error!("Failed to update verification status: {:?}", e); 562 return ApiError::InternalError.into_response(); 563 } 564 565 if let Err(e) = sqlx::query!( 566 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'email'", 567 row.id 568 ) 569 .execute(&state.db) 570 .await { 571 error!("Failed to delete verification record: {:?}", e); 572 } 573 574 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 575 Ok(m) => m, 576 Err(e) => { 577 error!("Failed to create access token: {:?}", e); 578 return ApiError::InternalError.into_response(); 579 } 580 }; 581 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 582 Ok(m) => m, 583 Err(e) => { 584 error!("Failed to create refresh token: {:?}", e); 585 return ApiError::InternalError.into_response(); 586 } 587 }; 588 if let Err(e) = sqlx::query!( 589 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 590 row.did, 591 access_meta.jti, 592 refresh_meta.jti, 593 access_meta.expires_at, 594 refresh_meta.expires_at 595 ) 596 .execute(&state.db) 597 .await 598 { 599 error!("Failed to insert session: {:?}", e); 600 return ApiError::InternalError.into_response(); 601 } 602 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 603 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 604 warn!("Failed to enqueue welcome notification: {:?}", e); 605 } 606 let email_verified = matches!( 607 row.channel, 608 crate::comms::CommsChannel::Email 609 ); 610 let preferred_channel = match row.channel { 611 crate::comms::CommsChannel::Email => "email", 612 crate::comms::CommsChannel::Discord => "discord", 613 crate::comms::CommsChannel::Telegram => "telegram", 614 crate::comms::CommsChannel::Signal => "signal", 615 }; 616 Json(ConfirmSignupOutput { 617 access_jwt: access_meta.token, 618 refresh_jwt: refresh_meta.token, 619 handle: row.handle, 620 did: row.did, 621 email: row.email, 622 email_verified, 623 preferred_channel: preferred_channel.to_string(), 624 preferred_channel_verified: true, 625 }) 626 .into_response() 627} 628 629#[derive(Deserialize)] 630#[serde(rename_all = "camelCase")] 631pub struct ResendVerificationInput { 632 pub did: String, 633} 634 635pub async fn resend_verification( 636 State(state): State<AppState>, 637 Json(input): Json<ResendVerificationInput>, 638) -> Response { 639 info!("resend_verification called for DID: {}", input.did); 640 let row = match sqlx::query!( 641 r#"SELECT 642 id, handle, email, 643 preferred_comms_channel as "channel: crate::comms::CommsChannel", 644 discord_id, telegram_username, signal_number, 645 email_verified, discord_verified, telegram_verified, signal_verified 646 FROM users 647 WHERE did = $1"#, 648 input.did 649 ) 650 .fetch_optional(&state.db) 651 .await 652 { 653 Ok(Some(row)) => row, 654 Ok(None) => { 655 return ApiError::InvalidRequest("User not found".into()).into_response(); 656 } 657 Err(e) => { 658 error!("Database error in resend_verification: {:?}", e); 659 return ApiError::InternalError.into_response(); 660 } 661 }; 662 let is_verified = 663 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 664 if is_verified { 665 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 666 } 667 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 668 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 669 670 let email = row.email.clone(); 671 672 if let Err(e) = sqlx::query!( 673 r#" 674 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) 675 VALUES ($1, 'email', $2, $3, $4) 676 ON CONFLICT (user_id, channel) DO UPDATE 677 SET code = $2, pending_identifier = $3, expires_at = $4, created_at = NOW() 678 "#, 679 row.id, 680 verification_code, 681 email, 682 code_expires_at 683 ) 684 .execute(&state.db) 685 .await 686 { 687 error!("Failed to update verification code: {:?}", e); 688 return ApiError::InternalError.into_response(); 689 } 690 let (channel_str, recipient) = match row.channel { 691 crate::comms::CommsChannel::Email => { 692 ("email", row.email.unwrap_or_default()) 693 } 694 crate::comms::CommsChannel::Discord => { 695 ("discord", row.discord_id.unwrap_or_default()) 696 } 697 crate::comms::CommsChannel::Telegram => { 698 ("telegram", row.telegram_username.unwrap_or_default()) 699 } 700 crate::comms::CommsChannel::Signal => { 701 ("signal", row.signal_number.unwrap_or_default()) 702 } 703 }; 704 if let Err(e) = crate::comms::enqueue_signup_verification( 705 &state.db, 706 row.id, 707 channel_str, 708 &recipient, 709 &verification_code, 710 ) 711 .await 712 { 713 warn!("Failed to enqueue verification notification: {:?}", e); 714 } 715 Json(json!({"success": true})).into_response() 716} 717 718#[derive(Serialize)] 719#[serde(rename_all = "camelCase")] 720pub struct SessionInfo { 721 pub id: String, 722 pub created_at: String, 723 pub expires_at: String, 724 pub is_current: bool, 725} 726 727#[derive(Serialize)] 728#[serde(rename_all = "camelCase")] 729pub struct ListSessionsOutput { 730 pub sessions: Vec<SessionInfo>, 731} 732 733pub async fn list_sessions( 734 State(state): State<AppState>, 735 headers: HeaderMap, 736 auth: BearerAuth, 737) -> Response { 738 let current_jti = headers 739 .get("authorization") 740 .and_then(|v| v.to_str().ok()) 741 .and_then(|v| v.strip_prefix("Bearer ")) 742 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 743 let result = sqlx::query_as::<_, (i32, String, chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)>( 744 r#" 745 SELECT id, access_jti, created_at, refresh_expires_at 746 FROM session_tokens 747 WHERE did = $1 AND refresh_expires_at > NOW() 748 ORDER BY created_at DESC 749 "#, 750 ) 751 .bind(&auth.0.did) 752 .fetch_all(&state.db) 753 .await; 754 match result { 755 Ok(rows) => { 756 let sessions: Vec<SessionInfo> = rows 757 .into_iter() 758 .map(|(id, access_jti, created_at, expires_at)| SessionInfo { 759 id: id.to_string(), 760 created_at: created_at.to_rfc3339(), 761 expires_at: expires_at.to_rfc3339(), 762 is_current: current_jti.as_ref().map_or(false, |j| j == &access_jti), 763 }) 764 .collect(); 765 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 766 } 767 Err(e) => { 768 error!("DB error in list_sessions: {:?}", e); 769 ( 770 StatusCode::INTERNAL_SERVER_ERROR, 771 Json(json!({"error": "InternalError"})), 772 ) 773 .into_response() 774 } 775 } 776} 777 778#[derive(Deserialize)] 779#[serde(rename_all = "camelCase")] 780pub struct RevokeSessionInput { 781 pub session_id: String, 782} 783 784pub async fn revoke_session( 785 State(state): State<AppState>, 786 auth: BearerAuth, 787 Json(input): Json<RevokeSessionInput>, 788) -> Response { 789 let session_id: i32 = match input.session_id.parse() { 790 Ok(id) => id, 791 Err(_) => { 792 return ( 793 StatusCode::BAD_REQUEST, 794 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 795 ) 796 .into_response(); 797 } 798 }; 799 let session = sqlx::query_as::<_, (String,)>( 800 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 801 ) 802 .bind(session_id) 803 .bind(&auth.0.did) 804 .fetch_optional(&state.db) 805 .await; 806 let access_jti = match session { 807 Ok(Some((jti,))) => jti, 808 Ok(None) => { 809 return ( 810 StatusCode::NOT_FOUND, 811 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 812 ) 813 .into_response(); 814 } 815 Err(e) => { 816 error!("DB error in revoke_session: {:?}", e); 817 return ( 818 StatusCode::INTERNAL_SERVER_ERROR, 819 Json(json!({"error": "InternalError"})), 820 ) 821 .into_response(); 822 } 823 }; 824 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 825 .bind(session_id) 826 .execute(&state.db) 827 .await 828 { 829 error!("DB error deleting session: {:?}", e); 830 return ( 831 StatusCode::INTERNAL_SERVER_ERROR, 832 Json(json!({"error": "InternalError"})), 833 ) 834 .into_response(); 835 } 836 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 837 if let Err(e) = state.cache.delete(&cache_key).await { 838 warn!("Failed to invalidate session cache: {:?}", e); 839 } 840 info!(did = %auth.0.did, session_id = %session_id, "Session revoked"); 841 (StatusCode::OK, Json(json!({}))).into_response() 842}