this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tracing::{error, info, warn};
15
16fn extract_client_ip(headers: &HeaderMap) -> String {
17 if let Some(forwarded) = headers.get("x-forwarded-for")
18 && let Ok(value) = forwarded.to_str()
19 && let Some(first_ip) = value.split(',').next()
20 {
21 return first_ip.trim().to_string();
22 }
23 if let Some(real_ip) = headers.get("x-real-ip")
24 && let Ok(value) = real_ip.to_str()
25 {
26 return value.trim().to_string();
27 }
28 "unknown".to_string()
29}
30
31fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
32 let suffix = format!(".{}", pds_hostname);
33 if identifier.ends_with(&suffix) {
34 identifier[..identifier.len() - suffix.len()].to_string()
35 } else {
36 identifier.to_string()
37 }
38}
39
40fn full_handle(stored_handle: &str, pds_hostname: &str) -> String {
41 let suffix = format!(".{}", pds_hostname);
42 if stored_handle.ends_with(&suffix) || stored_handle.ends_with(pds_hostname) {
43 stored_handle.to_string()
44 } else {
45 format!("{}.{}", stored_handle, pds_hostname)
46 }
47}
48
49#[derive(Deserialize)]
50pub struct CreateSessionInput {
51 pub identifier: String,
52 pub password: String,
53}
54
55#[derive(Serialize)]
56#[serde(rename_all = "camelCase")]
57pub struct CreateSessionOutput {
58 pub access_jwt: String,
59 pub refresh_jwt: String,
60 pub handle: String,
61 pub did: String,
62}
63
64pub async fn create_session(
65 State(state): State<AppState>,
66 headers: HeaderMap,
67 Json(input): Json<CreateSessionInput>,
68) -> Response {
69 info!("create_session called");
70 let client_ip = extract_client_ip(&headers);
71 if !state
72 .check_rate_limit(RateLimitKind::Login, &client_ip)
73 .await
74 {
75 warn!(ip = %client_ip, "Login rate limit exceeded");
76 return (
77 StatusCode::TOO_MANY_REQUESTS,
78 Json(json!({
79 "error": "RateLimitExceeded",
80 "message": "Too many login attempts. Please try again later."
81 })),
82 )
83 .into_response();
84 }
85 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
86 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
87 let row = match sqlx::query!(
88 r#"SELECT
89 u.id, u.did, u.handle, u.password_hash,
90 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
91 k.key_bytes, k.encryption_version
92 FROM users u
93 JOIN user_keys k ON u.id = k.user_id
94 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
95 normalized_identifier
96 )
97 .fetch_optional(&state.db)
98 .await
99 {
100 Ok(Some(row)) => row,
101 Ok(None) => {
102 let _ = verify(
103 &input.password,
104 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
105 );
106 warn!("User not found for login attempt");
107 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
108 .into_response();
109 }
110 Err(e) => {
111 error!("Database error fetching user: {:?}", e);
112 return ApiError::InternalError.into_response();
113 }
114 };
115 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
116 Ok(k) => k,
117 Err(e) => {
118 error!("Failed to decrypt user key: {:?}", e);
119 return ApiError::InternalError.into_response();
120 }
121 };
122 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) {
123 true
124 } else {
125 let app_passwords = sqlx::query!(
126 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
127 row.id
128 )
129 .fetch_all(&state.db)
130 .await
131 .unwrap_or_default();
132 app_passwords
133 .iter()
134 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false))
135 };
136 if !password_valid {
137 warn!("Password verification failed for login attempt");
138 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
139 .into_response();
140 }
141 let is_verified =
142 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
143 if !is_verified {
144 warn!("Login attempt for unverified account: {}", row.did);
145 return (
146 StatusCode::FORBIDDEN,
147 Json(json!({
148 "error": "AccountNotVerified",
149 "message": "Please verify your account before logging in",
150 "did": row.did
151 })),
152 )
153 .into_response();
154 }
155 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
156 Ok(m) => m,
157 Err(e) => {
158 error!("Failed to create access token: {:?}", e);
159 return ApiError::InternalError.into_response();
160 }
161 };
162 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
163 Ok(m) => m,
164 Err(e) => {
165 error!("Failed to create refresh token: {:?}", e);
166 return ApiError::InternalError.into_response();
167 }
168 };
169 if let Err(e) = sqlx::query!(
170 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
171 row.did,
172 access_meta.jti,
173 refresh_meta.jti,
174 access_meta.expires_at,
175 refresh_meta.expires_at
176 )
177 .execute(&state.db)
178 .await
179 {
180 error!("Failed to insert session: {:?}", e);
181 return ApiError::InternalError.into_response();
182 }
183 let handle = full_handle(&row.handle, &pds_hostname);
184 Json(CreateSessionOutput {
185 access_jwt: access_meta.token,
186 refresh_jwt: refresh_meta.token,
187 handle,
188 did: row.did,
189 })
190 .into_response()
191}
192
193pub async fn get_session(
194 State(state): State<AppState>,
195 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
196) -> Response {
197 let permissions = auth_user.permissions();
198 let can_read_email = permissions.allows_email_read();
199
200 match sqlx::query!(
201 r#"SELECT
202 handle, email, email_verified, is_admin, deactivated_at,
203 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
204 discord_verified, telegram_verified, signal_verified
205 FROM users WHERE did = $1"#,
206 auth_user.did
207 )
208 .fetch_optional(&state.db)
209 .await
210 {
211 Ok(Some(row)) => {
212 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
213 crate::comms::CommsChannel::Email => ("email", row.email_verified),
214 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
215 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
216 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
217 };
218 let pds_hostname =
219 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
220 let handle = full_handle(&row.handle, &pds_hostname);
221 let is_active = row.deactivated_at.is_none();
222 let email_value = if can_read_email {
223 row.email.clone()
224 } else {
225 None
226 };
227 let email_verified_value = can_read_email && row.email_verified;
228 Json(json!({
229 "handle": handle,
230 "did": auth_user.did,
231 "email": email_value,
232 "emailVerified": email_verified_value,
233 "preferredChannel": preferred_channel,
234 "preferredChannelVerified": preferred_channel_verified,
235 "isAdmin": row.is_admin,
236 "active": is_active,
237 "status": if is_active { "active" } else { "deactivated" },
238 "didDoc": {}
239 }))
240 .into_response()
241 }
242 Ok(None) => ApiError::AuthenticationFailed.into_response(),
243 Err(e) => {
244 error!("Database error in get_session: {:?}", e);
245 ApiError::InternalError.into_response()
246 }
247 }
248}
249
250pub async fn delete_session(
251 State(state): State<AppState>,
252 headers: axum::http::HeaderMap,
253) -> Response {
254 let token = match crate::auth::extract_bearer_token_from_header(
255 headers.get("Authorization").and_then(|h| h.to_str().ok()),
256 ) {
257 Some(t) => t,
258 None => return ApiError::AuthenticationRequired.into_response(),
259 };
260 let jti = match crate::auth::get_jti_from_token(&token) {
261 Ok(jti) => jti,
262 Err(_) => return ApiError::AuthenticationFailed.into_response(),
263 };
264 let did = crate::auth::get_did_from_token(&token).ok();
265 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
266 .execute(&state.db)
267 .await
268 {
269 Ok(res) if res.rows_affected() > 0 => {
270 if let Some(did) = did {
271 let session_cache_key = format!("auth:session:{}:{}", did, jti);
272 let _ = state.cache.delete(&session_cache_key).await;
273 }
274 Json(json!({})).into_response()
275 }
276 Ok(_) => ApiError::AuthenticationFailed.into_response(),
277 Err(e) => {
278 error!("Database error in delete_session: {:?}", e);
279 ApiError::AuthenticationFailed.into_response()
280 }
281 }
282}
283
284pub async fn refresh_session(
285 State(state): State<AppState>,
286 headers: axum::http::HeaderMap,
287) -> Response {
288 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
289 if !state
290 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
291 .await
292 {
293 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
294 return (
295 axum::http::StatusCode::TOO_MANY_REQUESTS,
296 axum::Json(serde_json::json!({
297 "error": "RateLimitExceeded",
298 "message": "Too many requests. Please try again later."
299 })),
300 )
301 .into_response();
302 }
303 let refresh_token = match crate::auth::extract_bearer_token_from_header(
304 headers.get("Authorization").and_then(|h| h.to_str().ok()),
305 ) {
306 Some(t) => t,
307 None => return ApiError::AuthenticationRequired.into_response(),
308 };
309 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
310 Ok(jti) => jti,
311 Err(_) => {
312 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
313 .into_response();
314 }
315 };
316 let mut tx = match state.db.begin().await {
317 Ok(tx) => tx,
318 Err(e) => {
319 error!("Failed to begin transaction: {:?}", e);
320 return ApiError::InternalError.into_response();
321 }
322 };
323 if let Ok(Some(session_id)) = sqlx::query_scalar!(
324 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
325 refresh_jti
326 )
327 .fetch_optional(&mut *tx)
328 .await
329 {
330 warn!(
331 "Refresh token reuse detected! Revoking token family for session_id: {}",
332 session_id
333 );
334 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
335 .execute(&mut *tx)
336 .await;
337 let _ = tx.commit().await;
338 return ApiError::ExpiredTokenMsg(
339 "Refresh token has been revoked due to suspected compromise".into(),
340 )
341 .into_response();
342 }
343 let session_row = match sqlx::query!(
344 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
345 FROM session_tokens st
346 JOIN users u ON st.did = u.did
347 JOIN user_keys k ON u.id = k.user_id
348 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
349 FOR UPDATE OF st"#,
350 refresh_jti
351 )
352 .fetch_optional(&mut *tx)
353 .await
354 {
355 Ok(Some(row)) => row,
356 Ok(None) => {
357 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
358 .into_response();
359 }
360 Err(e) => {
361 error!("Database error fetching session: {:?}", e);
362 return ApiError::InternalError.into_response();
363 }
364 };
365 let key_bytes =
366 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
367 Ok(k) => k,
368 Err(e) => {
369 error!("Failed to decrypt user key: {:?}", e);
370 return ApiError::InternalError.into_response();
371 }
372 };
373 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
374 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
375 }
376 let new_access_meta =
377 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
378 Ok(m) => m,
379 Err(e) => {
380 error!("Failed to create access token: {:?}", e);
381 return ApiError::InternalError.into_response();
382 }
383 };
384 let new_refresh_meta =
385 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
386 Ok(m) => m,
387 Err(e) => {
388 error!("Failed to create refresh token: {:?}", e);
389 return ApiError::InternalError.into_response();
390 }
391 };
392 match sqlx::query!(
393 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
394 refresh_jti,
395 session_row.id
396 )
397 .execute(&mut *tx)
398 .await
399 {
400 Ok(result) if result.rows_affected() == 0 => {
401 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
402 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
403 .execute(&mut *tx)
404 .await;
405 let _ = tx.commit().await;
406 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
407 }
408 Err(e) => {
409 error!("Failed to record used refresh token: {:?}", e);
410 return ApiError::InternalError.into_response();
411 }
412 Ok(_) => {}
413 }
414 if let Err(e) = sqlx::query!(
415 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
416 new_access_meta.jti,
417 new_refresh_meta.jti,
418 new_access_meta.expires_at,
419 new_refresh_meta.expires_at,
420 session_row.id
421 )
422 .execute(&mut *tx)
423 .await
424 {
425 error!("Database error updating session: {:?}", e);
426 return ApiError::InternalError.into_response();
427 }
428 if let Err(e) = tx.commit().await {
429 error!("Failed to commit transaction: {:?}", e);
430 return ApiError::InternalError.into_response();
431 }
432 match sqlx::query!(
433 r#"SELECT
434 handle, email, email_verified, is_admin,
435 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
436 discord_verified, telegram_verified, signal_verified
437 FROM users WHERE did = $1"#,
438 session_row.did
439 )
440 .fetch_optional(&state.db)
441 .await
442 {
443 Ok(Some(u)) => {
444 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
445 crate::comms::CommsChannel::Email => ("email", u.email_verified),
446 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
447 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
448 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
449 };
450 let pds_hostname =
451 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
452 let handle = full_handle(&u.handle, &pds_hostname);
453 Json(json!({
454 "accessJwt": new_access_meta.token,
455 "refreshJwt": new_refresh_meta.token,
456 "handle": handle,
457 "did": session_row.did,
458 "email": u.email,
459 "emailVerified": u.email_verified,
460 "preferredChannel": preferred_channel,
461 "preferredChannelVerified": preferred_channel_verified,
462 "isAdmin": u.is_admin,
463 "active": true
464 }))
465 .into_response()
466 }
467 Ok(None) => {
468 error!("User not found for existing session: {}", session_row.did);
469 ApiError::InternalError.into_response()
470 }
471 Err(e) => {
472 error!("Database error fetching user: {:?}", e);
473 ApiError::InternalError.into_response()
474 }
475 }
476}
477
478#[derive(Deserialize)]
479#[serde(rename_all = "camelCase")]
480pub struct ConfirmSignupInput {
481 pub did: String,
482 pub verification_code: String,
483}
484
485#[derive(Serialize)]
486#[serde(rename_all = "camelCase")]
487pub struct ConfirmSignupOutput {
488 pub access_jwt: String,
489 pub refresh_jwt: String,
490 pub handle: String,
491 pub did: String,
492 pub email: Option<String>,
493 pub email_verified: bool,
494 pub preferred_channel: String,
495 pub preferred_channel_verified: bool,
496}
497
498pub async fn confirm_signup(
499 State(state): State<AppState>,
500 Json(input): Json<ConfirmSignupInput>,
501) -> Response {
502 info!("confirm_signup called for DID: {}", input.did);
503 let row = match sqlx::query!(
504 r#"SELECT
505 u.id, u.did, u.handle, u.email,
506 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
507 k.key_bytes, k.encryption_version
508 FROM users u
509 JOIN user_keys k ON u.id = k.user_id
510 WHERE u.did = $1"#,
511 input.did
512 )
513 .fetch_optional(&state.db)
514 .await
515 {
516 Ok(Some(row)) => row,
517 Ok(None) => {
518 warn!("User not found for confirm_signup: {}", input.did);
519 return ApiError::InvalidRequest("Invalid DID or verification code".into())
520 .into_response();
521 }
522 Err(e) => {
523 error!("Database error in confirm_signup: {:?}", e);
524 return ApiError::InternalError.into_response();
525 }
526 };
527
528 let verification = match sqlx::query!(
529 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = 'email'",
530 row.id
531 )
532 .fetch_optional(&state.db)
533 .await
534 {
535 Ok(Some(v)) => v,
536 Ok(None) => {
537 warn!("No verification code found for user: {}", input.did);
538 return ApiError::InvalidRequest("No pending verification".into()).into_response();
539 }
540 Err(e) => {
541 error!("Database error fetching verification: {:?}", e);
542 return ApiError::InternalError.into_response();
543 }
544 };
545
546 if verification.code != input.verification_code {
547 warn!("Invalid verification code for user: {}", input.did);
548 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
549 }
550 if verification.expires_at < Utc::now() {
551 warn!("Verification code expired for user: {}", input.did);
552 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response();
553 }
554
555 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
556 Ok(k) => k,
557 Err(e) => {
558 error!("Failed to decrypt user key: {:?}", e);
559 return ApiError::InternalError.into_response();
560 }
561 };
562 let verified_column = match row.channel {
563 crate::comms::CommsChannel::Email => "email_verified",
564 crate::comms::CommsChannel::Discord => "discord_verified",
565 crate::comms::CommsChannel::Telegram => "telegram_verified",
566 crate::comms::CommsChannel::Signal => "signal_verified",
567 };
568 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
569 if let Err(e) = sqlx::query(&update_query)
570 .bind(&input.did)
571 .execute(&state.db)
572 .await
573 {
574 error!("Failed to update verification status: {:?}", e);
575 return ApiError::InternalError.into_response();
576 }
577
578 if let Err(e) = sqlx::query!(
579 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = 'email'",
580 row.id
581 )
582 .execute(&state.db)
583 .await
584 {
585 error!("Failed to delete verification record: {:?}", e);
586 }
587
588 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
589 Ok(m) => m,
590 Err(e) => {
591 error!("Failed to create access token: {:?}", e);
592 return ApiError::InternalError.into_response();
593 }
594 };
595 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
596 Ok(m) => m,
597 Err(e) => {
598 error!("Failed to create refresh token: {:?}", e);
599 return ApiError::InternalError.into_response();
600 }
601 };
602 if let Err(e) = sqlx::query!(
603 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
604 row.did,
605 access_meta.jti,
606 refresh_meta.jti,
607 access_meta.expires_at,
608 refresh_meta.expires_at
609 )
610 .execute(&state.db)
611 .await
612 {
613 error!("Failed to insert session: {:?}", e);
614 return ApiError::InternalError.into_response();
615 }
616 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
617 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
618 warn!("Failed to enqueue welcome notification: {:?}", e);
619 }
620 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
621 let preferred_channel = match row.channel {
622 crate::comms::CommsChannel::Email => "email",
623 crate::comms::CommsChannel::Discord => "discord",
624 crate::comms::CommsChannel::Telegram => "telegram",
625 crate::comms::CommsChannel::Signal => "signal",
626 };
627 Json(ConfirmSignupOutput {
628 access_jwt: access_meta.token,
629 refresh_jwt: refresh_meta.token,
630 handle: row.handle,
631 did: row.did,
632 email: row.email,
633 email_verified,
634 preferred_channel: preferred_channel.to_string(),
635 preferred_channel_verified: true,
636 })
637 .into_response()
638}
639
640#[derive(Deserialize)]
641#[serde(rename_all = "camelCase")]
642pub struct ResendVerificationInput {
643 pub did: String,
644}
645
646pub async fn resend_verification(
647 State(state): State<AppState>,
648 Json(input): Json<ResendVerificationInput>,
649) -> Response {
650 info!("resend_verification called for DID: {}", input.did);
651 let row = match sqlx::query!(
652 r#"SELECT
653 id, handle, email,
654 preferred_comms_channel as "channel: crate::comms::CommsChannel",
655 discord_id, telegram_username, signal_number,
656 email_verified, discord_verified, telegram_verified, signal_verified
657 FROM users
658 WHERE did = $1"#,
659 input.did
660 )
661 .fetch_optional(&state.db)
662 .await
663 {
664 Ok(Some(row)) => row,
665 Ok(None) => {
666 return ApiError::InvalidRequest("User not found".into()).into_response();
667 }
668 Err(e) => {
669 error!("Database error in resend_verification: {:?}", e);
670 return ApiError::InternalError.into_response();
671 }
672 };
673 let is_verified =
674 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
675 if is_verified {
676 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
677 }
678 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
679 let code_expires_at = Utc::now() + chrono::Duration::minutes(30);
680
681 let email = row.email.clone();
682
683 if let Err(e) = sqlx::query!(
684 r#"
685 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at)
686 VALUES ($1, 'email', $2, $3, $4)
687 ON CONFLICT (user_id, channel) DO UPDATE
688 SET code = $2, pending_identifier = $3, expires_at = $4, created_at = NOW()
689 "#,
690 row.id,
691 verification_code,
692 email,
693 code_expires_at
694 )
695 .execute(&state.db)
696 .await
697 {
698 error!("Failed to update verification code: {:?}", e);
699 return ApiError::InternalError.into_response();
700 }
701 let (channel_str, recipient) = match row.channel {
702 crate::comms::CommsChannel::Email => ("email", row.email.unwrap_or_default()),
703 crate::comms::CommsChannel::Discord => ("discord", row.discord_id.unwrap_or_default()),
704 crate::comms::CommsChannel::Telegram => {
705 ("telegram", row.telegram_username.unwrap_or_default())
706 }
707 crate::comms::CommsChannel::Signal => ("signal", row.signal_number.unwrap_or_default()),
708 };
709 if let Err(e) = crate::comms::enqueue_signup_verification(
710 &state.db,
711 row.id,
712 channel_str,
713 &recipient,
714 &verification_code,
715 )
716 .await
717 {
718 warn!("Failed to enqueue verification notification: {:?}", e);
719 }
720 Json(json!({"success": true})).into_response()
721}
722
723#[derive(Serialize)]
724#[serde(rename_all = "camelCase")]
725pub struct SessionInfo {
726 pub id: String,
727 pub created_at: String,
728 pub expires_at: String,
729 pub is_current: bool,
730}
731
732#[derive(Serialize)]
733#[serde(rename_all = "camelCase")]
734pub struct ListSessionsOutput {
735 pub sessions: Vec<SessionInfo>,
736}
737
738pub async fn list_sessions(
739 State(state): State<AppState>,
740 headers: HeaderMap,
741 auth: BearerAuth,
742) -> Response {
743 let current_jti = headers
744 .get("authorization")
745 .and_then(|v| v.to_str().ok())
746 .and_then(|v| v.strip_prefix("Bearer "))
747 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
748 let result = sqlx::query_as::<
749 _,
750 (
751 i32,
752 String,
753 chrono::DateTime<chrono::Utc>,
754 chrono::DateTime<chrono::Utc>,
755 ),
756 >(
757 r#"
758 SELECT id, access_jti, created_at, refresh_expires_at
759 FROM session_tokens
760 WHERE did = $1 AND refresh_expires_at > NOW()
761 ORDER BY created_at DESC
762 "#,
763 )
764 .bind(&auth.0.did)
765 .fetch_all(&state.db)
766 .await;
767 match result {
768 Ok(rows) => {
769 let sessions: Vec<SessionInfo> = rows
770 .into_iter()
771 .map(|(id, access_jti, created_at, expires_at)| SessionInfo {
772 id: id.to_string(),
773 created_at: created_at.to_rfc3339(),
774 expires_at: expires_at.to_rfc3339(),
775 is_current: current_jti.as_ref() == Some(&access_jti),
776 })
777 .collect();
778 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
779 }
780 Err(e) => {
781 error!("DB error in list_sessions: {:?}", e);
782 (
783 StatusCode::INTERNAL_SERVER_ERROR,
784 Json(json!({"error": "InternalError"})),
785 )
786 .into_response()
787 }
788 }
789}
790
791#[derive(Deserialize)]
792#[serde(rename_all = "camelCase")]
793pub struct RevokeSessionInput {
794 pub session_id: String,
795}
796
797pub async fn revoke_session(
798 State(state): State<AppState>,
799 auth: BearerAuth,
800 Json(input): Json<RevokeSessionInput>,
801) -> Response {
802 let session_id: i32 = match input.session_id.parse() {
803 Ok(id) => id,
804 Err(_) => {
805 return (
806 StatusCode::BAD_REQUEST,
807 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
808 )
809 .into_response();
810 }
811 };
812 let session = sqlx::query_as::<_, (String,)>(
813 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
814 )
815 .bind(session_id)
816 .bind(&auth.0.did)
817 .fetch_optional(&state.db)
818 .await;
819 let access_jti = match session {
820 Ok(Some((jti,))) => jti,
821 Ok(None) => {
822 return (
823 StatusCode::NOT_FOUND,
824 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
825 )
826 .into_response();
827 }
828 Err(e) => {
829 error!("DB error in revoke_session: {:?}", e);
830 return (
831 StatusCode::INTERNAL_SERVER_ERROR,
832 Json(json!({"error": "InternalError"})),
833 )
834 .into_response();
835 }
836 };
837 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
838 .bind(session_id)
839 .execute(&state.db)
840 .await
841 {
842 error!("DB error deleting session: {:?}", e);
843 return (
844 StatusCode::INTERNAL_SERVER_ERROR,
845 Json(json!({"error": "InternalError"})),
846 )
847 .into_response();
848 }
849 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
850 if let Err(e) = state.cache.delete(&cache_key).await {
851 warn!("Failed to invalidate session cache: {:?}", e);
852 }
853 info!(did = %auth.0.did, session_id = %session_id, "Session revoked");
854 (StatusCode::OK, Json(json!({}))).into_response()
855}