this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") 18 && let Ok(value) = forwarded.to_str() 19 && let Some(first_ip) = value.split(',').next() 20 { 21 return first_ip.trim().to_string(); 22 } 23 if let Some(real_ip) = headers.get("x-real-ip") 24 && let Ok(value) = real_ip.to_str() 25 { 26 return value.trim().to_string(); 27 } 28 "unknown".to_string() 29} 30 31fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 32 let identifier = identifier.trim(); 33 if identifier.contains('@') || identifier.starts_with("did:") { 34 identifier.to_string() 35 } else if !identifier.contains('.') { 36 format!("{}.{}", identifier.to_lowercase(), pds_hostname) 37 } else { 38 identifier.to_lowercase() 39 } 40} 41 42fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String { 43 stored_handle.to_string() 44} 45 46#[derive(Deserialize)] 47pub struct CreateSessionInput { 48 pub identifier: String, 49 pub password: String, 50} 51 52#[derive(Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct CreateSessionOutput { 55 pub access_jwt: String, 56 pub refresh_jwt: String, 57 pub handle: String, 58 pub did: String, 59} 60 61pub async fn create_session( 62 State(state): State<AppState>, 63 headers: HeaderMap, 64 Json(input): Json<CreateSessionInput>, 65) -> Response { 66 info!("create_session called with identifier: {}", input.identifier); 67 let client_ip = extract_client_ip(&headers); 68 if !state 69 .check_rate_limit(RateLimitKind::Login, &client_ip) 70 .await 71 { 72 warn!(ip = %client_ip, "Login rate limit exceeded"); 73 return ( 74 StatusCode::TOO_MANY_REQUESTS, 75 Json(json!({ 76 "error": "RateLimitExceeded", 77 "message": "Too many login attempts. Please try again later." 78 })), 79 ) 80 .into_response(); 81 } 82 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 83 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 84 info!("Normalized identifier: {} -> {}", input.identifier, normalized_identifier); 85 let row = match sqlx::query!( 86 r#"SELECT 87 u.id, u.did, u.handle, u.password_hash, 88 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 89 k.key_bytes, k.encryption_version 90 FROM users u 91 JOIN user_keys k ON u.id = k.user_id 92 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 93 normalized_identifier 94 ) 95 .fetch_optional(&state.db) 96 .await 97 { 98 Ok(Some(row)) => row, 99 Ok(None) => { 100 let _ = verify( 101 &input.password, 102 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 103 ); 104 warn!("User not found for login attempt"); 105 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 106 .into_response(); 107 } 108 Err(e) => { 109 error!("Database error fetching user: {:?}", e); 110 return ApiError::InternalError.into_response(); 111 } 112 }; 113 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 114 Ok(k) => k, 115 Err(e) => { 116 error!("Failed to decrypt user key: {:?}", e); 117 return ApiError::InternalError.into_response(); 118 } 119 }; 120 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) { 121 true 122 } else { 123 let app_passwords = sqlx::query!( 124 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 125 row.id 126 ) 127 .fetch_all(&state.db) 128 .await 129 .unwrap_or_default(); 130 app_passwords 131 .iter() 132 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 133 }; 134 if !password_valid { 135 warn!("Password verification failed for login attempt"); 136 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 137 .into_response(); 138 } 139 let is_verified = 140 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 141 if !is_verified { 142 warn!("Login attempt for unverified account: {}", row.did); 143 return ( 144 StatusCode::FORBIDDEN, 145 Json(json!({ 146 "error": "AccountNotVerified", 147 "message": "Please verify your account before logging in", 148 "did": row.did 149 })), 150 ) 151 .into_response(); 152 } 153 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 154 Ok(m) => m, 155 Err(e) => { 156 error!("Failed to create access token: {:?}", e); 157 return ApiError::InternalError.into_response(); 158 } 159 }; 160 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 161 Ok(m) => m, 162 Err(e) => { 163 error!("Failed to create refresh token: {:?}", e); 164 return ApiError::InternalError.into_response(); 165 } 166 }; 167 if let Err(e) = sqlx::query!( 168 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 169 row.did, 170 access_meta.jti, 171 refresh_meta.jti, 172 access_meta.expires_at, 173 refresh_meta.expires_at 174 ) 175 .execute(&state.db) 176 .await 177 { 178 error!("Failed to insert session: {:?}", e); 179 return ApiError::InternalError.into_response(); 180 } 181 let handle = full_handle(&row.handle, &pds_hostname); 182 Json(CreateSessionOutput { 183 access_jwt: access_meta.token, 184 refresh_jwt: refresh_meta.token, 185 handle, 186 did: row.did, 187 }) 188 .into_response() 189} 190 191pub async fn get_session( 192 State(state): State<AppState>, 193 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 194) -> Response { 195 let permissions = auth_user.permissions(); 196 let can_read_email = permissions.allows_email_read(); 197 198 match sqlx::query!( 199 r#"SELECT 200 handle, email, email_verified, is_admin, deactivated_at, 201 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 202 discord_verified, telegram_verified, signal_verified 203 FROM users WHERE did = $1"#, 204 auth_user.did 205 ) 206 .fetch_optional(&state.db) 207 .await 208 { 209 Ok(Some(row)) => { 210 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 211 crate::comms::CommsChannel::Email => ("email", row.email_verified), 212 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 213 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 214 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 215 }; 216 let pds_hostname = 217 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 218 let handle = full_handle(&row.handle, &pds_hostname); 219 let is_active = row.deactivated_at.is_none(); 220 let email_value = if can_read_email { 221 row.email.clone() 222 } else { 223 None 224 }; 225 let email_verified_value = can_read_email && row.email_verified; 226 Json(json!({ 227 "handle": handle, 228 "did": auth_user.did, 229 "email": email_value, 230 "emailVerified": email_verified_value, 231 "preferredChannel": preferred_channel, 232 "preferredChannelVerified": preferred_channel_verified, 233 "isAdmin": row.is_admin, 234 "active": is_active, 235 "status": if is_active { "active" } else { "deactivated" }, 236 "didDoc": {} 237 })) 238 .into_response() 239 } 240 Ok(None) => ApiError::AuthenticationFailed.into_response(), 241 Err(e) => { 242 error!("Database error in get_session: {:?}", e); 243 ApiError::InternalError.into_response() 244 } 245 } 246} 247 248pub async fn delete_session( 249 State(state): State<AppState>, 250 headers: axum::http::HeaderMap, 251) -> Response { 252 let token = match crate::auth::extract_bearer_token_from_header( 253 headers.get("Authorization").and_then(|h| h.to_str().ok()), 254 ) { 255 Some(t) => t, 256 None => return ApiError::AuthenticationRequired.into_response(), 257 }; 258 let jti = match crate::auth::get_jti_from_token(&token) { 259 Ok(jti) => jti, 260 Err(_) => return ApiError::AuthenticationFailed.into_response(), 261 }; 262 let did = crate::auth::get_did_from_token(&token).ok(); 263 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 264 .execute(&state.db) 265 .await 266 { 267 Ok(res) if res.rows_affected() > 0 => { 268 if let Some(did) = did { 269 let session_cache_key = format!("auth:session:{}:{}", did, jti); 270 let _ = state.cache.delete(&session_cache_key).await; 271 } 272 Json(json!({})).into_response() 273 } 274 Ok(_) => ApiError::AuthenticationFailed.into_response(), 275 Err(e) => { 276 error!("Database error in delete_session: {:?}", e); 277 ApiError::AuthenticationFailed.into_response() 278 } 279 } 280} 281 282pub async fn refresh_session( 283 State(state): State<AppState>, 284 headers: axum::http::HeaderMap, 285) -> Response { 286 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 287 if !state 288 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 289 .await 290 { 291 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 292 return ( 293 axum::http::StatusCode::TOO_MANY_REQUESTS, 294 axum::Json(serde_json::json!({ 295 "error": "RateLimitExceeded", 296 "message": "Too many requests. Please try again later." 297 })), 298 ) 299 .into_response(); 300 } 301 let refresh_token = match crate::auth::extract_bearer_token_from_header( 302 headers.get("Authorization").and_then(|h| h.to_str().ok()), 303 ) { 304 Some(t) => t, 305 None => return ApiError::AuthenticationRequired.into_response(), 306 }; 307 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 308 Ok(jti) => jti, 309 Err(_) => { 310 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 311 .into_response(); 312 } 313 }; 314 let mut tx = match state.db.begin().await { 315 Ok(tx) => tx, 316 Err(e) => { 317 error!("Failed to begin transaction: {:?}", e); 318 return ApiError::InternalError.into_response(); 319 } 320 }; 321 if let Ok(Some(session_id)) = sqlx::query_scalar!( 322 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 323 refresh_jti 324 ) 325 .fetch_optional(&mut *tx) 326 .await 327 { 328 warn!( 329 "Refresh token reuse detected! Revoking token family for session_id: {}", 330 session_id 331 ); 332 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 333 .execute(&mut *tx) 334 .await; 335 let _ = tx.commit().await; 336 return ApiError::ExpiredTokenMsg( 337 "Refresh token has been revoked due to suspected compromise".into(), 338 ) 339 .into_response(); 340 } 341 let session_row = match sqlx::query!( 342 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 343 FROM session_tokens st 344 JOIN users u ON st.did = u.did 345 JOIN user_keys k ON u.id = k.user_id 346 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 347 FOR UPDATE OF st"#, 348 refresh_jti 349 ) 350 .fetch_optional(&mut *tx) 351 .await 352 { 353 Ok(Some(row)) => row, 354 Ok(None) => { 355 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 356 .into_response(); 357 } 358 Err(e) => { 359 error!("Database error fetching session: {:?}", e); 360 return ApiError::InternalError.into_response(); 361 } 362 }; 363 let key_bytes = 364 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 365 Ok(k) => k, 366 Err(e) => { 367 error!("Failed to decrypt user key: {:?}", e); 368 return ApiError::InternalError.into_response(); 369 } 370 }; 371 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 372 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 373 } 374 let new_access_meta = 375 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 376 Ok(m) => m, 377 Err(e) => { 378 error!("Failed to create access token: {:?}", e); 379 return ApiError::InternalError.into_response(); 380 } 381 }; 382 let new_refresh_meta = 383 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 384 Ok(m) => m, 385 Err(e) => { 386 error!("Failed to create refresh token: {:?}", e); 387 return ApiError::InternalError.into_response(); 388 } 389 }; 390 match sqlx::query!( 391 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 392 refresh_jti, 393 session_row.id 394 ) 395 .execute(&mut *tx) 396 .await 397 { 398 Ok(result) if result.rows_affected() == 0 => { 399 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 400 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 401 .execute(&mut *tx) 402 .await; 403 let _ = tx.commit().await; 404 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 405 } 406 Err(e) => { 407 error!("Failed to record used refresh token: {:?}", e); 408 return ApiError::InternalError.into_response(); 409 } 410 Ok(_) => {} 411 } 412 if let Err(e) = sqlx::query!( 413 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 414 new_access_meta.jti, 415 new_refresh_meta.jti, 416 new_access_meta.expires_at, 417 new_refresh_meta.expires_at, 418 session_row.id 419 ) 420 .execute(&mut *tx) 421 .await 422 { 423 error!("Database error updating session: {:?}", e); 424 return ApiError::InternalError.into_response(); 425 } 426 if let Err(e) = tx.commit().await { 427 error!("Failed to commit transaction: {:?}", e); 428 return ApiError::InternalError.into_response(); 429 } 430 match sqlx::query!( 431 r#"SELECT 432 handle, email, email_verified, is_admin, 433 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 434 discord_verified, telegram_verified, signal_verified 435 FROM users WHERE did = $1"#, 436 session_row.did 437 ) 438 .fetch_optional(&state.db) 439 .await 440 { 441 Ok(Some(u)) => { 442 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 443 crate::comms::CommsChannel::Email => ("email", u.email_verified), 444 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 445 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 446 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 447 }; 448 let pds_hostname = 449 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 450 let handle = full_handle(&u.handle, &pds_hostname); 451 Json(json!({ 452 "accessJwt": new_access_meta.token, 453 "refreshJwt": new_refresh_meta.token, 454 "handle": handle, 455 "did": session_row.did, 456 "email": u.email, 457 "emailVerified": u.email_verified, 458 "preferredChannel": preferred_channel, 459 "preferredChannelVerified": preferred_channel_verified, 460 "isAdmin": u.is_admin, 461 "active": true 462 })) 463 .into_response() 464 } 465 Ok(None) => { 466 error!("User not found for existing session: {}", session_row.did); 467 ApiError::InternalError.into_response() 468 } 469 Err(e) => { 470 error!("Database error fetching user: {:?}", e); 471 ApiError::InternalError.into_response() 472 } 473 } 474} 475 476#[derive(Deserialize)] 477#[serde(rename_all = "camelCase")] 478pub struct ConfirmSignupInput { 479 pub did: String, 480 pub verification_code: String, 481} 482 483#[derive(Serialize)] 484#[serde(rename_all = "camelCase")] 485pub struct ConfirmSignupOutput { 486 pub access_jwt: String, 487 pub refresh_jwt: String, 488 pub handle: String, 489 pub did: String, 490 pub email: Option<String>, 491 pub email_verified: bool, 492 pub preferred_channel: String, 493 pub preferred_channel_verified: bool, 494} 495 496pub async fn confirm_signup( 497 State(state): State<AppState>, 498 Json(input): Json<ConfirmSignupInput>, 499) -> Response { 500 info!("confirm_signup called for DID: {}", input.did); 501 let row = match sqlx::query!( 502 r#"SELECT 503 u.id, u.did, u.handle, u.email, 504 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 505 k.key_bytes, k.encryption_version 506 FROM users u 507 JOIN user_keys k ON u.id = k.user_id 508 WHERE u.did = $1"#, 509 input.did 510 ) 511 .fetch_optional(&state.db) 512 .await 513 { 514 Ok(Some(row)) => row, 515 Ok(None) => { 516 warn!("User not found for confirm_signup: {}", input.did); 517 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 518 .into_response(); 519 } 520 Err(e) => { 521 error!("Database error in confirm_signup: {:?}", e); 522 return ApiError::InternalError.into_response(); 523 } 524 }; 525 526 let verification = match sqlx::query!( 527 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = 'email'", 528 row.id 529 ) 530 .fetch_optional(&state.db) 531 .await 532 { 533 Ok(Some(v)) => v, 534 Ok(None) => { 535 warn!("No verification code found for user: {}", input.did); 536 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 537 } 538 Err(e) => { 539 error!("Database error fetching verification: {:?}", e); 540 return ApiError::InternalError.into_response(); 541 } 542 }; 543 544 if verification.code != input.verification_code { 545 warn!("Invalid verification code for user: {}", input.did); 546 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 547 } 548 if verification.expires_at < Utc::now() { 549 warn!("Verification code expired for user: {}", input.did); 550 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 551 } 552 553 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 554 Ok(k) => k, 555 Err(e) => { 556 error!("Failed to decrypt user key: {:?}", e); 557 return ApiError::InternalError.into_response(); 558 } 559 }; 560 let verified_column = match row.channel { 561 crate::comms::CommsChannel::Email => "email_verified", 562 crate::comms::CommsChannel::Discord => "discord_verified", 563 crate::comms::CommsChannel::Telegram => "telegram_verified", 564 crate::comms::CommsChannel::Signal => "signal_verified", 565 }; 566 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 567 if let Err(e) = sqlx::query(&update_query) 568 .bind(&input.did) 569 .execute(&state.db) 570 .await 571 { 572 error!("Failed to update verification status: {:?}", e); 573 return ApiError::InternalError.into_response(); 574 } 575 576 if let Err(e) = sqlx::query!( 577 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'email'", 578 row.id 579 ) 580 .execute(&state.db) 581 .await 582 { 583 error!("Failed to delete verification record: {:?}", e); 584 } 585 586 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 587 Ok(m) => m, 588 Err(e) => { 589 error!("Failed to create access token: {:?}", e); 590 return ApiError::InternalError.into_response(); 591 } 592 }; 593 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 594 Ok(m) => m, 595 Err(e) => { 596 error!("Failed to create refresh token: {:?}", e); 597 return ApiError::InternalError.into_response(); 598 } 599 }; 600 if let Err(e) = sqlx::query!( 601 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 602 row.did, 603 access_meta.jti, 604 refresh_meta.jti, 605 access_meta.expires_at, 606 refresh_meta.expires_at 607 ) 608 .execute(&state.db) 609 .await 610 { 611 error!("Failed to insert session: {:?}", e); 612 return ApiError::InternalError.into_response(); 613 } 614 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 615 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 616 warn!("Failed to enqueue welcome notification: {:?}", e); 617 } 618 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 619 let preferred_channel = match row.channel { 620 crate::comms::CommsChannel::Email => "email", 621 crate::comms::CommsChannel::Discord => "discord", 622 crate::comms::CommsChannel::Telegram => "telegram", 623 crate::comms::CommsChannel::Signal => "signal", 624 }; 625 Json(ConfirmSignupOutput { 626 access_jwt: access_meta.token, 627 refresh_jwt: refresh_meta.token, 628 handle: row.handle, 629 did: row.did, 630 email: row.email, 631 email_verified, 632 preferred_channel: preferred_channel.to_string(), 633 preferred_channel_verified: true, 634 }) 635 .into_response() 636} 637 638#[derive(Deserialize)] 639#[serde(rename_all = "camelCase")] 640pub struct ResendVerificationInput { 641 pub did: String, 642} 643 644pub async fn resend_verification( 645 State(state): State<AppState>, 646 Json(input): Json<ResendVerificationInput>, 647) -> Response { 648 info!("resend_verification called for DID: {}", input.did); 649 let row = match sqlx::query!( 650 r#"SELECT 651 id, handle, email, 652 preferred_comms_channel as "channel: crate::comms::CommsChannel", 653 discord_id, telegram_username, signal_number, 654 email_verified, discord_verified, telegram_verified, signal_verified 655 FROM users 656 WHERE did = $1"#, 657 input.did 658 ) 659 .fetch_optional(&state.db) 660 .await 661 { 662 Ok(Some(row)) => row, 663 Ok(None) => { 664 return ApiError::InvalidRequest("User not found".into()).into_response(); 665 } 666 Err(e) => { 667 error!("Database error in resend_verification: {:?}", e); 668 return ApiError::InternalError.into_response(); 669 } 670 }; 671 let is_verified = 672 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 673 if is_verified { 674 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 675 } 676 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 677 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 678 679 let email = row.email.clone(); 680 681 if let Err(e) = sqlx::query!( 682 r#" 683 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) 684 VALUES ($1, 'email', $2, $3, $4) 685 ON CONFLICT (user_id, channel) DO UPDATE 686 SET code = $2, pending_identifier = $3, expires_at = $4, created_at = NOW() 687 "#, 688 row.id, 689 verification_code, 690 email, 691 code_expires_at 692 ) 693 .execute(&state.db) 694 .await 695 { 696 error!("Failed to update verification code: {:?}", e); 697 return ApiError::InternalError.into_response(); 698 } 699 let (channel_str, recipient) = match row.channel { 700 crate::comms::CommsChannel::Email => ("email", row.email.unwrap_or_default()), 701 crate::comms::CommsChannel::Discord => ("discord", row.discord_id.unwrap_or_default()), 702 crate::comms::CommsChannel::Telegram => { 703 ("telegram", row.telegram_username.unwrap_or_default()) 704 } 705 crate::comms::CommsChannel::Signal => ("signal", row.signal_number.unwrap_or_default()), 706 }; 707 if let Err(e) = crate::comms::enqueue_signup_verification( 708 &state.db, 709 row.id, 710 channel_str, 711 &recipient, 712 &verification_code, 713 ) 714 .await 715 { 716 warn!("Failed to enqueue verification notification: {:?}", e); 717 } 718 Json(json!({"success": true})).into_response() 719} 720 721#[derive(Serialize)] 722#[serde(rename_all = "camelCase")] 723pub struct SessionInfo { 724 pub id: String, 725 pub created_at: String, 726 pub expires_at: String, 727 pub is_current: bool, 728} 729 730#[derive(Serialize)] 731#[serde(rename_all = "camelCase")] 732pub struct ListSessionsOutput { 733 pub sessions: Vec<SessionInfo>, 734} 735 736pub async fn list_sessions( 737 State(state): State<AppState>, 738 headers: HeaderMap, 739 auth: BearerAuth, 740) -> Response { 741 let current_jti = headers 742 .get("authorization") 743 .and_then(|v| v.to_str().ok()) 744 .and_then(|v| v.strip_prefix("Bearer ")) 745 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 746 let result = sqlx::query_as::< 747 _, 748 ( 749 i32, 750 String, 751 chrono::DateTime<chrono::Utc>, 752 chrono::DateTime<chrono::Utc>, 753 ), 754 >( 755 r#" 756 SELECT id, access_jti, created_at, refresh_expires_at 757 FROM session_tokens 758 WHERE did = $1 AND refresh_expires_at > NOW() 759 ORDER BY created_at DESC 760 "#, 761 ) 762 .bind(&auth.0.did) 763 .fetch_all(&state.db) 764 .await; 765 match result { 766 Ok(rows) => { 767 let sessions: Vec<SessionInfo> = rows 768 .into_iter() 769 .map(|(id, access_jti, created_at, expires_at)| SessionInfo { 770 id: id.to_string(), 771 created_at: created_at.to_rfc3339(), 772 expires_at: expires_at.to_rfc3339(), 773 is_current: current_jti.as_ref() == Some(&access_jti), 774 }) 775 .collect(); 776 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 777 } 778 Err(e) => { 779 error!("DB error in list_sessions: {:?}", e); 780 ( 781 StatusCode::INTERNAL_SERVER_ERROR, 782 Json(json!({"error": "InternalError"})), 783 ) 784 .into_response() 785 } 786 } 787} 788 789#[derive(Deserialize)] 790#[serde(rename_all = "camelCase")] 791pub struct RevokeSessionInput { 792 pub session_id: String, 793} 794 795pub async fn revoke_session( 796 State(state): State<AppState>, 797 auth: BearerAuth, 798 Json(input): Json<RevokeSessionInput>, 799) -> Response { 800 let session_id: i32 = match input.session_id.parse() { 801 Ok(id) => id, 802 Err(_) => { 803 return ( 804 StatusCode::BAD_REQUEST, 805 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 806 ) 807 .into_response(); 808 } 809 }; 810 let session = sqlx::query_as::<_, (String,)>( 811 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 812 ) 813 .bind(session_id) 814 .bind(&auth.0.did) 815 .fetch_optional(&state.db) 816 .await; 817 let access_jti = match session { 818 Ok(Some((jti,))) => jti, 819 Ok(None) => { 820 return ( 821 StatusCode::NOT_FOUND, 822 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 823 ) 824 .into_response(); 825 } 826 Err(e) => { 827 error!("DB error in revoke_session: {:?}", e); 828 return ( 829 StatusCode::INTERNAL_SERVER_ERROR, 830 Json(json!({"error": "InternalError"})), 831 ) 832 .into_response(); 833 } 834 }; 835 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 836 .bind(session_id) 837 .execute(&state.db) 838 .await 839 { 840 error!("DB error deleting session: {:?}", e); 841 return ( 842 StatusCode::INTERNAL_SERVER_ERROR, 843 Json(json!({"error": "InternalError"})), 844 ) 845 .into_response(); 846 } 847 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 848 if let Err(e) = state.cache.delete(&cache_key).await { 849 warn!("Failed to invalidate session cache: {:?}", e); 850 } 851 info!(did = %auth.0.did, session_id = %session_id, "Session revoked"); 852 (StatusCode::OK, Json(json!({}))).into_response() 853}