this repo has no description
1use crate::api::ApiError; 2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated}; 3use crate::state::{AppState, RateLimitKind}; 4use axum::{ 5 Json, 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use bcrypt::verify; 11use chrono::Utc; 12use serde::{Deserialize, Serialize}; 13use serde_json::json; 14use tracing::{error, info, warn}; 15 16fn extract_client_ip(headers: &HeaderMap) -> String { 17 if let Some(forwarded) = headers.get("x-forwarded-for") 18 && let Ok(value) = forwarded.to_str() 19 && let Some(first_ip) = value.split(',').next() 20 { 21 return first_ip.trim().to_string(); 22 } 23 if let Some(real_ip) = headers.get("x-real-ip") 24 && let Ok(value) = real_ip.to_str() 25 { 26 return value.trim().to_string(); 27 } 28 "unknown".to_string() 29} 30 31fn normalize_handle(identifier: &str, pds_hostname: &str) -> String { 32 let suffix = format!(".{}", pds_hostname); 33 if identifier.ends_with(&suffix) { 34 identifier[..identifier.len() - suffix.len()].to_string() 35 } else { 36 identifier.to_string() 37 } 38} 39 40fn full_handle(stored_handle: &str, pds_hostname: &str) -> String { 41 let suffix = format!(".{}", pds_hostname); 42 if stored_handle.ends_with(&suffix) || stored_handle.ends_with(pds_hostname) { 43 stored_handle.to_string() 44 } else { 45 format!("{}.{}", stored_handle, pds_hostname) 46 } 47} 48 49#[derive(Deserialize)] 50pub struct CreateSessionInput { 51 pub identifier: String, 52 pub password: String, 53} 54 55#[derive(Serialize)] 56#[serde(rename_all = "camelCase")] 57pub struct CreateSessionOutput { 58 pub access_jwt: String, 59 pub refresh_jwt: String, 60 pub handle: String, 61 pub did: String, 62} 63 64pub async fn create_session( 65 State(state): State<AppState>, 66 headers: HeaderMap, 67 Json(input): Json<CreateSessionInput>, 68) -> Response { 69 info!("create_session called"); 70 let client_ip = extract_client_ip(&headers); 71 if !state 72 .check_rate_limit(RateLimitKind::Login, &client_ip) 73 .await 74 { 75 warn!(ip = %client_ip, "Login rate limit exceeded"); 76 return ( 77 StatusCode::TOO_MANY_REQUESTS, 78 Json(json!({ 79 "error": "RateLimitExceeded", 80 "message": "Too many login attempts. Please try again later." 81 })), 82 ) 83 .into_response(); 84 } 85 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 86 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname); 87 let row = match sqlx::query!( 88 r#"SELECT 89 u.id, u.did, u.handle, u.password_hash, 90 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified, 91 k.key_bytes, k.encryption_version 92 FROM users u 93 JOIN user_keys k ON u.id = k.user_id 94 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#, 95 normalized_identifier 96 ) 97 .fetch_optional(&state.db) 98 .await 99 { 100 Ok(Some(row)) => row, 101 Ok(None) => { 102 let _ = verify( 103 &input.password, 104 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK", 105 ); 106 warn!("User not found for login attempt"); 107 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 108 .into_response(); 109 } 110 Err(e) => { 111 error!("Database error fetching user: {:?}", e); 112 return ApiError::InternalError.into_response(); 113 } 114 }; 115 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 116 Ok(k) => k, 117 Err(e) => { 118 error!("Failed to decrypt user key: {:?}", e); 119 return ApiError::InternalError.into_response(); 120 } 121 }; 122 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) { 123 true 124 } else { 125 let app_passwords = sqlx::query!( 126 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", 127 row.id 128 ) 129 .fetch_all(&state.db) 130 .await 131 .unwrap_or_default(); 132 app_passwords 133 .iter() 134 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false)) 135 }; 136 if !password_valid { 137 warn!("Password verification failed for login attempt"); 138 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()) 139 .into_response(); 140 } 141 let is_verified = 142 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 143 if !is_verified { 144 warn!("Login attempt for unverified account: {}", row.did); 145 return ( 146 StatusCode::FORBIDDEN, 147 Json(json!({ 148 "error": "AccountNotVerified", 149 "message": "Please verify your account before logging in", 150 "did": row.did 151 })), 152 ) 153 .into_response(); 154 } 155 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 156 Ok(m) => m, 157 Err(e) => { 158 error!("Failed to create access token: {:?}", e); 159 return ApiError::InternalError.into_response(); 160 } 161 }; 162 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 163 Ok(m) => m, 164 Err(e) => { 165 error!("Failed to create refresh token: {:?}", e); 166 return ApiError::InternalError.into_response(); 167 } 168 }; 169 if let Err(e) = sqlx::query!( 170 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 171 row.did, 172 access_meta.jti, 173 refresh_meta.jti, 174 access_meta.expires_at, 175 refresh_meta.expires_at 176 ) 177 .execute(&state.db) 178 .await 179 { 180 error!("Failed to insert session: {:?}", e); 181 return ApiError::InternalError.into_response(); 182 } 183 let handle = full_handle(&row.handle, &pds_hostname); 184 Json(CreateSessionOutput { 185 access_jwt: access_meta.token, 186 refresh_jwt: refresh_meta.token, 187 handle, 188 did: row.did, 189 }) 190 .into_response() 191} 192 193pub async fn get_session( 194 State(state): State<AppState>, 195 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated, 196) -> Response { 197 let permissions = auth_user.permissions(); 198 let can_read_email = permissions.allows_email_read(); 199 200 match sqlx::query!( 201 r#"SELECT 202 handle, email, email_verified, is_admin, deactivated_at, 203 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 204 discord_verified, telegram_verified, signal_verified 205 FROM users WHERE did = $1"#, 206 auth_user.did 207 ) 208 .fetch_optional(&state.db) 209 .await 210 { 211 Ok(Some(row)) => { 212 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel { 213 crate::comms::CommsChannel::Email => ("email", row.email_verified), 214 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified), 215 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified), 216 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified), 217 }; 218 let pds_hostname = 219 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 220 let handle = full_handle(&row.handle, &pds_hostname); 221 let is_active = row.deactivated_at.is_none(); 222 let email_value = if can_read_email { 223 row.email.clone() 224 } else { 225 None 226 }; 227 let email_verified_value = can_read_email && row.email_verified; 228 Json(json!({ 229 "handle": handle, 230 "did": auth_user.did, 231 "email": email_value, 232 "emailVerified": email_verified_value, 233 "preferredChannel": preferred_channel, 234 "preferredChannelVerified": preferred_channel_verified, 235 "isAdmin": row.is_admin, 236 "active": is_active, 237 "status": if is_active { "active" } else { "deactivated" }, 238 "didDoc": {} 239 })) 240 .into_response() 241 } 242 Ok(None) => ApiError::AuthenticationFailed.into_response(), 243 Err(e) => { 244 error!("Database error in get_session: {:?}", e); 245 ApiError::InternalError.into_response() 246 } 247 } 248} 249 250pub async fn delete_session( 251 State(state): State<AppState>, 252 headers: axum::http::HeaderMap, 253) -> Response { 254 let token = match crate::auth::extract_bearer_token_from_header( 255 headers.get("Authorization").and_then(|h| h.to_str().ok()), 256 ) { 257 Some(t) => t, 258 None => return ApiError::AuthenticationRequired.into_response(), 259 }; 260 let jti = match crate::auth::get_jti_from_token(&token) { 261 Ok(jti) => jti, 262 Err(_) => return ApiError::AuthenticationFailed.into_response(), 263 }; 264 let did = crate::auth::get_did_from_token(&token).ok(); 265 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti) 266 .execute(&state.db) 267 .await 268 { 269 Ok(res) if res.rows_affected() > 0 => { 270 if let Some(did) = did { 271 let session_cache_key = format!("auth:session:{}:{}", did, jti); 272 let _ = state.cache.delete(&session_cache_key).await; 273 } 274 Json(json!({})).into_response() 275 } 276 Ok(_) => ApiError::AuthenticationFailed.into_response(), 277 Err(e) => { 278 error!("Database error in delete_session: {:?}", e); 279 ApiError::AuthenticationFailed.into_response() 280 } 281 } 282} 283 284pub async fn refresh_session( 285 State(state): State<AppState>, 286 headers: axum::http::HeaderMap, 287) -> Response { 288 let client_ip = crate::rate_limit::extract_client_ip(&headers, None); 289 if !state 290 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip) 291 .await 292 { 293 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded"); 294 return ( 295 axum::http::StatusCode::TOO_MANY_REQUESTS, 296 axum::Json(serde_json::json!({ 297 "error": "RateLimitExceeded", 298 "message": "Too many requests. Please try again later." 299 })), 300 ) 301 .into_response(); 302 } 303 let refresh_token = match crate::auth::extract_bearer_token_from_header( 304 headers.get("Authorization").and_then(|h| h.to_str().ok()), 305 ) { 306 Some(t) => t, 307 None => return ApiError::AuthenticationRequired.into_response(), 308 }; 309 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) { 310 Ok(jti) => jti, 311 Err(_) => { 312 return ApiError::AuthenticationFailedMsg("Invalid token format".into()) 313 .into_response(); 314 } 315 }; 316 let mut tx = match state.db.begin().await { 317 Ok(tx) => tx, 318 Err(e) => { 319 error!("Failed to begin transaction: {:?}", e); 320 return ApiError::InternalError.into_response(); 321 } 322 }; 323 if let Ok(Some(session_id)) = sqlx::query_scalar!( 324 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE", 325 refresh_jti 326 ) 327 .fetch_optional(&mut *tx) 328 .await 329 { 330 warn!( 331 "Refresh token reuse detected! Revoking token family for session_id: {}", 332 session_id 333 ); 334 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id) 335 .execute(&mut *tx) 336 .await; 337 let _ = tx.commit().await; 338 return ApiError::ExpiredTokenMsg( 339 "Refresh token has been revoked due to suspected compromise".into(), 340 ) 341 .into_response(); 342 } 343 let session_row = match sqlx::query!( 344 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version 345 FROM session_tokens st 346 JOIN users u ON st.did = u.did 347 JOIN user_keys k ON u.id = k.user_id 348 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW() 349 FOR UPDATE OF st"#, 350 refresh_jti 351 ) 352 .fetch_optional(&mut *tx) 353 .await 354 { 355 Ok(Some(row)) => row, 356 Ok(None) => { 357 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()) 358 .into_response(); 359 } 360 Err(e) => { 361 error!("Database error fetching session: {:?}", e); 362 return ApiError::InternalError.into_response(); 363 } 364 }; 365 let key_bytes = 366 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) { 367 Ok(k) => k, 368 Err(e) => { 369 error!("Failed to decrypt user key: {:?}", e); 370 return ApiError::InternalError.into_response(); 371 } 372 }; 373 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() { 374 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(); 375 } 376 let new_access_meta = 377 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) { 378 Ok(m) => m, 379 Err(e) => { 380 error!("Failed to create access token: {:?}", e); 381 return ApiError::InternalError.into_response(); 382 } 383 }; 384 let new_refresh_meta = 385 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) { 386 Ok(m) => m, 387 Err(e) => { 388 error!("Failed to create refresh token: {:?}", e); 389 return ApiError::InternalError.into_response(); 390 } 391 }; 392 match sqlx::query!( 393 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING", 394 refresh_jti, 395 session_row.id 396 ) 397 .execute(&mut *tx) 398 .await 399 { 400 Ok(result) if result.rows_affected() == 0 => { 401 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id); 402 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id) 403 .execute(&mut *tx) 404 .await; 405 let _ = tx.commit().await; 406 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response(); 407 } 408 Err(e) => { 409 error!("Failed to record used refresh token: {:?}", e); 410 return ApiError::InternalError.into_response(); 411 } 412 Ok(_) => {} 413 } 414 if let Err(e) = sqlx::query!( 415 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5", 416 new_access_meta.jti, 417 new_refresh_meta.jti, 418 new_access_meta.expires_at, 419 new_refresh_meta.expires_at, 420 session_row.id 421 ) 422 .execute(&mut *tx) 423 .await 424 { 425 error!("Database error updating session: {:?}", e); 426 return ApiError::InternalError.into_response(); 427 } 428 if let Err(e) = tx.commit().await { 429 error!("Failed to commit transaction: {:?}", e); 430 return ApiError::InternalError.into_response(); 431 } 432 match sqlx::query!( 433 r#"SELECT 434 handle, email, email_verified, is_admin, 435 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel", 436 discord_verified, telegram_verified, signal_verified 437 FROM users WHERE did = $1"#, 438 session_row.did 439 ) 440 .fetch_optional(&state.db) 441 .await 442 { 443 Ok(Some(u)) => { 444 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel { 445 crate::comms::CommsChannel::Email => ("email", u.email_verified), 446 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified), 447 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified), 448 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified), 449 }; 450 let pds_hostname = 451 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 452 let handle = full_handle(&u.handle, &pds_hostname); 453 Json(json!({ 454 "accessJwt": new_access_meta.token, 455 "refreshJwt": new_refresh_meta.token, 456 "handle": handle, 457 "did": session_row.did, 458 "email": u.email, 459 "emailVerified": u.email_verified, 460 "preferredChannel": preferred_channel, 461 "preferredChannelVerified": preferred_channel_verified, 462 "isAdmin": u.is_admin, 463 "active": true 464 })) 465 .into_response() 466 } 467 Ok(None) => { 468 error!("User not found for existing session: {}", session_row.did); 469 ApiError::InternalError.into_response() 470 } 471 Err(e) => { 472 error!("Database error fetching user: {:?}", e); 473 ApiError::InternalError.into_response() 474 } 475 } 476} 477 478#[derive(Deserialize)] 479#[serde(rename_all = "camelCase")] 480pub struct ConfirmSignupInput { 481 pub did: String, 482 pub verification_code: String, 483} 484 485#[derive(Serialize)] 486#[serde(rename_all = "camelCase")] 487pub struct ConfirmSignupOutput { 488 pub access_jwt: String, 489 pub refresh_jwt: String, 490 pub handle: String, 491 pub did: String, 492 pub email: Option<String>, 493 pub email_verified: bool, 494 pub preferred_channel: String, 495 pub preferred_channel_verified: bool, 496} 497 498pub async fn confirm_signup( 499 State(state): State<AppState>, 500 Json(input): Json<ConfirmSignupInput>, 501) -> Response { 502 info!("confirm_signup called for DID: {}", input.did); 503 let row = match sqlx::query!( 504 r#"SELECT 505 u.id, u.did, u.handle, u.email, 506 u.preferred_comms_channel as "channel: crate::comms::CommsChannel", 507 k.key_bytes, k.encryption_version 508 FROM users u 509 JOIN user_keys k ON u.id = k.user_id 510 WHERE u.did = $1"#, 511 input.did 512 ) 513 .fetch_optional(&state.db) 514 .await 515 { 516 Ok(Some(row)) => row, 517 Ok(None) => { 518 warn!("User not found for confirm_signup: {}", input.did); 519 return ApiError::InvalidRequest("Invalid DID or verification code".into()) 520 .into_response(); 521 } 522 Err(e) => { 523 error!("Database error in confirm_signup: {:?}", e); 524 return ApiError::InternalError.into_response(); 525 } 526 }; 527 528 let verification = match sqlx::query!( 529 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = 'email'", 530 row.id 531 ) 532 .fetch_optional(&state.db) 533 .await 534 { 535 Ok(Some(v)) => v, 536 Ok(None) => { 537 warn!("No verification code found for user: {}", input.did); 538 return ApiError::InvalidRequest("No pending verification".into()).into_response(); 539 } 540 Err(e) => { 541 error!("Database error fetching verification: {:?}", e); 542 return ApiError::InternalError.into_response(); 543 } 544 }; 545 546 if verification.code != input.verification_code { 547 warn!("Invalid verification code for user: {}", input.did); 548 return ApiError::InvalidRequest("Invalid verification code".into()).into_response(); 549 } 550 if verification.expires_at < Utc::now() { 551 warn!("Verification code expired for user: {}", input.did); 552 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response(); 553 } 554 555 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 556 Ok(k) => k, 557 Err(e) => { 558 error!("Failed to decrypt user key: {:?}", e); 559 return ApiError::InternalError.into_response(); 560 } 561 }; 562 let verified_column = match row.channel { 563 crate::comms::CommsChannel::Email => "email_verified", 564 crate::comms::CommsChannel::Discord => "discord_verified", 565 crate::comms::CommsChannel::Telegram => "telegram_verified", 566 crate::comms::CommsChannel::Signal => "signal_verified", 567 }; 568 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 569 if let Err(e) = sqlx::query(&update_query) 570 .bind(&input.did) 571 .execute(&state.db) 572 .await 573 { 574 error!("Failed to update verification status: {:?}", e); 575 return ApiError::InternalError.into_response(); 576 } 577 578 if let Err(e) = sqlx::query!( 579 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'email'", 580 row.id 581 ) 582 .execute(&state.db) 583 .await 584 { 585 error!("Failed to delete verification record: {:?}", e); 586 } 587 588 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 589 Ok(m) => m, 590 Err(e) => { 591 error!("Failed to create access token: {:?}", e); 592 return ApiError::InternalError.into_response(); 593 } 594 }; 595 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) { 596 Ok(m) => m, 597 Err(e) => { 598 error!("Failed to create refresh token: {:?}", e); 599 return ApiError::InternalError.into_response(); 600 } 601 }; 602 if let Err(e) = sqlx::query!( 603 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 604 row.did, 605 access_meta.jti, 606 refresh_meta.jti, 607 access_meta.expires_at, 608 refresh_meta.expires_at 609 ) 610 .execute(&state.db) 611 .await 612 { 613 error!("Failed to insert session: {:?}", e); 614 return ApiError::InternalError.into_response(); 615 } 616 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 617 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 618 warn!("Failed to enqueue welcome notification: {:?}", e); 619 } 620 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email); 621 let preferred_channel = match row.channel { 622 crate::comms::CommsChannel::Email => "email", 623 crate::comms::CommsChannel::Discord => "discord", 624 crate::comms::CommsChannel::Telegram => "telegram", 625 crate::comms::CommsChannel::Signal => "signal", 626 }; 627 Json(ConfirmSignupOutput { 628 access_jwt: access_meta.token, 629 refresh_jwt: refresh_meta.token, 630 handle: row.handle, 631 did: row.did, 632 email: row.email, 633 email_verified, 634 preferred_channel: preferred_channel.to_string(), 635 preferred_channel_verified: true, 636 }) 637 .into_response() 638} 639 640#[derive(Deserialize)] 641#[serde(rename_all = "camelCase")] 642pub struct ResendVerificationInput { 643 pub did: String, 644} 645 646pub async fn resend_verification( 647 State(state): State<AppState>, 648 Json(input): Json<ResendVerificationInput>, 649) -> Response { 650 info!("resend_verification called for DID: {}", input.did); 651 let row = match sqlx::query!( 652 r#"SELECT 653 id, handle, email, 654 preferred_comms_channel as "channel: crate::comms::CommsChannel", 655 discord_id, telegram_username, signal_number, 656 email_verified, discord_verified, telegram_verified, signal_verified 657 FROM users 658 WHERE did = $1"#, 659 input.did 660 ) 661 .fetch_optional(&state.db) 662 .await 663 { 664 Ok(Some(row)) => row, 665 Ok(None) => { 666 return ApiError::InvalidRequest("User not found".into()).into_response(); 667 } 668 Err(e) => { 669 error!("Database error in resend_verification: {:?}", e); 670 return ApiError::InternalError.into_response(); 671 } 672 }; 673 let is_verified = 674 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified; 675 if is_verified { 676 return ApiError::InvalidRequest("Account is already verified".into()).into_response(); 677 } 678 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000); 679 let code_expires_at = Utc::now() + chrono::Duration::minutes(30); 680 681 let email = row.email.clone(); 682 683 if let Err(e) = sqlx::query!( 684 r#" 685 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) 686 VALUES ($1, 'email', $2, $3, $4) 687 ON CONFLICT (user_id, channel) DO UPDATE 688 SET code = $2, pending_identifier = $3, expires_at = $4, created_at = NOW() 689 "#, 690 row.id, 691 verification_code, 692 email, 693 code_expires_at 694 ) 695 .execute(&state.db) 696 .await 697 { 698 error!("Failed to update verification code: {:?}", e); 699 return ApiError::InternalError.into_response(); 700 } 701 let (channel_str, recipient) = match row.channel { 702 crate::comms::CommsChannel::Email => ("email", row.email.unwrap_or_default()), 703 crate::comms::CommsChannel::Discord => ("discord", row.discord_id.unwrap_or_default()), 704 crate::comms::CommsChannel::Telegram => { 705 ("telegram", row.telegram_username.unwrap_or_default()) 706 } 707 crate::comms::CommsChannel::Signal => ("signal", row.signal_number.unwrap_or_default()), 708 }; 709 if let Err(e) = crate::comms::enqueue_signup_verification( 710 &state.db, 711 row.id, 712 channel_str, 713 &recipient, 714 &verification_code, 715 ) 716 .await 717 { 718 warn!("Failed to enqueue verification notification: {:?}", e); 719 } 720 Json(json!({"success": true})).into_response() 721} 722 723#[derive(Serialize)] 724#[serde(rename_all = "camelCase")] 725pub struct SessionInfo { 726 pub id: String, 727 pub created_at: String, 728 pub expires_at: String, 729 pub is_current: bool, 730} 731 732#[derive(Serialize)] 733#[serde(rename_all = "camelCase")] 734pub struct ListSessionsOutput { 735 pub sessions: Vec<SessionInfo>, 736} 737 738pub async fn list_sessions( 739 State(state): State<AppState>, 740 headers: HeaderMap, 741 auth: BearerAuth, 742) -> Response { 743 let current_jti = headers 744 .get("authorization") 745 .and_then(|v| v.to_str().ok()) 746 .and_then(|v| v.strip_prefix("Bearer ")) 747 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 748 let result = sqlx::query_as::< 749 _, 750 ( 751 i32, 752 String, 753 chrono::DateTime<chrono::Utc>, 754 chrono::DateTime<chrono::Utc>, 755 ), 756 >( 757 r#" 758 SELECT id, access_jti, created_at, refresh_expires_at 759 FROM session_tokens 760 WHERE did = $1 AND refresh_expires_at > NOW() 761 ORDER BY created_at DESC 762 "#, 763 ) 764 .bind(&auth.0.did) 765 .fetch_all(&state.db) 766 .await; 767 match result { 768 Ok(rows) => { 769 let sessions: Vec<SessionInfo> = rows 770 .into_iter() 771 .map(|(id, access_jti, created_at, expires_at)| SessionInfo { 772 id: id.to_string(), 773 created_at: created_at.to_rfc3339(), 774 expires_at: expires_at.to_rfc3339(), 775 is_current: current_jti.as_ref() == Some(&access_jti), 776 }) 777 .collect(); 778 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() 779 } 780 Err(e) => { 781 error!("DB error in list_sessions: {:?}", e); 782 ( 783 StatusCode::INTERNAL_SERVER_ERROR, 784 Json(json!({"error": "InternalError"})), 785 ) 786 .into_response() 787 } 788 } 789} 790 791#[derive(Deserialize)] 792#[serde(rename_all = "camelCase")] 793pub struct RevokeSessionInput { 794 pub session_id: String, 795} 796 797pub async fn revoke_session( 798 State(state): State<AppState>, 799 auth: BearerAuth, 800 Json(input): Json<RevokeSessionInput>, 801) -> Response { 802 let session_id: i32 = match input.session_id.parse() { 803 Ok(id) => id, 804 Err(_) => { 805 return ( 806 StatusCode::BAD_REQUEST, 807 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})), 808 ) 809 .into_response(); 810 } 811 }; 812 let session = sqlx::query_as::<_, (String,)>( 813 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2", 814 ) 815 .bind(session_id) 816 .bind(&auth.0.did) 817 .fetch_optional(&state.db) 818 .await; 819 let access_jti = match session { 820 Ok(Some((jti,))) => jti, 821 Ok(None) => { 822 return ( 823 StatusCode::NOT_FOUND, 824 Json(json!({"error": "SessionNotFound", "message": "Session not found"})), 825 ) 826 .into_response(); 827 } 828 Err(e) => { 829 error!("DB error in revoke_session: {:?}", e); 830 return ( 831 StatusCode::INTERNAL_SERVER_ERROR, 832 Json(json!({"error": "InternalError"})), 833 ) 834 .into_response(); 835 } 836 }; 837 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1") 838 .bind(session_id) 839 .execute(&state.db) 840 .await 841 { 842 error!("DB error deleting session: {:?}", e); 843 return ( 844 StatusCode::INTERNAL_SERVER_ERROR, 845 Json(json!({"error": "InternalError"})), 846 ) 847 .into_response(); 848 } 849 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti); 850 if let Err(e) = state.cache.delete(&cache_key).await { 851 warn!("Failed to invalidate session cache: {:?}", e); 852 } 853 info!(did = %auth.0.did, session_id = %session_id, "Session revoked"); 854 (StatusCode::OK, Json(json!({}))).into_response() 855}