this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14
15fn extract_client_ip(headers: &HeaderMap) -> String {
16 if let Some(forwarded) = headers.get("x-forwarded-for")
17 && let Ok(value) = forwarded.to_str()
18 && let Some(first_ip) = value.split(',').next()
19 {
20 return first_ip.trim().to_string();
21 }
22 if let Some(real_ip) = headers.get("x-real-ip")
23 && let Ok(value) = real_ip.to_str()
24 {
25 return value.trim().to_string();
26 }
27 "unknown".to_string()
28}
29
30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
31 let identifier = identifier.trim();
32 if identifier.contains('@') || identifier.starts_with("did:") {
33 identifier.to_string()
34 } else if !identifier.contains('.') {
35 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
36 } else {
37 identifier.to_lowercase()
38 }
39}
40
41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
42 stored_handle.to_string()
43}
44
45#[derive(Deserialize)]
46pub struct CreateSessionInput {
47 pub identifier: String,
48 pub password: String,
49}
50
51#[derive(Serialize)]
52#[serde(rename_all = "camelCase")]
53pub struct CreateSessionOutput {
54 pub access_jwt: String,
55 pub refresh_jwt: String,
56 pub handle: String,
57 pub did: String,
58}
59
60pub async fn create_session(
61 State(state): State<AppState>,
62 headers: HeaderMap,
63 Json(input): Json<CreateSessionInput>,
64) -> Response {
65 info!(
66 "create_session called with identifier: {}",
67 input.identifier
68 );
69 let client_ip = extract_client_ip(&headers);
70 if !state
71 .check_rate_limit(RateLimitKind::Login, &client_ip)
72 .await
73 {
74 warn!(ip = %client_ip, "Login rate limit exceeded");
75 return (
76 StatusCode::TOO_MANY_REQUESTS,
77 Json(json!({
78 "error": "RateLimitExceeded",
79 "message": "Too many login attempts. Please try again later."
80 })),
81 )
82 .into_response();
83 }
84 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
85 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
86 info!(
87 "Normalized identifier: {} -> {}",
88 input.identifier, normalized_identifier
89 );
90 let row = match sqlx::query!(
91 r#"SELECT
92 u.id, u.did, u.handle, u.password_hash,
93 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
94 u.allow_legacy_login,
95 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
96 k.key_bytes, k.encryption_version,
97 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
98 FROM users u
99 JOIN user_keys k ON u.id = k.user_id
100 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
101 normalized_identifier
102 )
103 .fetch_optional(&state.db)
104 .await
105 {
106 Ok(Some(row)) => row,
107 Ok(None) => {
108 let _ = verify(
109 &input.password,
110 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
111 );
112 warn!("User not found for login attempt");
113 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
114 .into_response();
115 }
116 Err(e) => {
117 error!("Database error fetching user: {:?}", e);
118 return ApiError::InternalError.into_response();
119 }
120 };
121 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
122 Ok(k) => k,
123 Err(e) => {
124 error!("Failed to decrypt user key: {:?}", e);
125 return ApiError::InternalError.into_response();
126 }
127 };
128 let (password_valid, app_password_scopes) = if row
129 .password_hash
130 .as_ref()
131 .map(|h| verify(&input.password, h).unwrap_or(false))
132 .unwrap_or(false)
133 {
134 (true, None)
135 } else {
136 let app_passwords = sqlx::query!(
137 "SELECT password_hash, scopes FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
138 row.id
139 )
140 .fetch_all(&state.db)
141 .await
142 .unwrap_or_default();
143 let matched = app_passwords
144 .iter()
145 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
146 match matched {
147 Some(app) => (true, app.scopes.clone()),
148 None => (false, None),
149 }
150 };
151 if !password_valid {
152 warn!("Password verification failed for login attempt");
153 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
154 .into_response();
155 }
156 let is_verified =
157 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
158 if !is_verified {
159 warn!("Login attempt for unverified account: {}", row.did);
160 return (
161 StatusCode::FORBIDDEN,
162 Json(json!({
163 "error": "AccountNotVerified",
164 "message": "Please verify your account before logging in",
165 "did": row.did
166 })),
167 )
168 .into_response();
169 }
170 let has_totp = row.totp_enabled.unwrap_or(false);
171 let is_legacy_login = has_totp;
172 if has_totp && !row.allow_legacy_login {
173 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
174 return (
175 StatusCode::FORBIDDEN,
176 Json(json!({
177 "error": "MfaRequired",
178 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
179 "did": row.did
180 })),
181 )
182 .into_response();
183 }
184 let access_meta = match crate::auth::create_access_token_with_scope_metadata(
185 &row.did,
186 &key_bytes,
187 app_password_scopes.as_deref(),
188 ) {
189 Ok(m) => m,
190 Err(e) => {
191 error!("Failed to create access token: {:?}", e);
192 return ApiError::InternalError.into_response();
193 }
194 };
195 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
196 Ok(m) => m,
197 Err(e) => {
198 error!("Failed to create refresh token: {:?}", e);
199 return ApiError::InternalError.into_response();
200 }
201 };
202 if let Err(e) = sqlx::query!(
203 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
204 row.did,
205 access_meta.jti,
206 refresh_meta.jti,
207 access_meta.expires_at,
208 refresh_meta.expires_at,
209 is_legacy_login,
210 false,
211 app_password_scopes
212 )
213 .execute(&state.db)
214 .await
215 {
216 error!("Failed to insert session: {:?}", e);
217 return ApiError::InternalError.into_response();
218 }
219 if is_legacy_login {
220 warn!(
221 did = %row.did,
222 ip = %client_ip,
223 "Legacy login on TOTP-enabled account - sending notification"
224 );
225 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
226 if let Err(e) = crate::comms::queue_legacy_login_notification(
227 &state.db,
228 row.id,
229 &hostname,
230 &client_ip,
231 row.preferred_comms_channel,
232 )
233 .await
234 {
235 error!("Failed to queue legacy login notification: {:?}", e);
236 }
237 }
238 let handle = full_handle(&row.handle, &pds_hostname);
239 Json(CreateSessionOutput {
240 access_jwt: access_meta.token,
241 refresh_jwt: refresh_meta.token,
242 handle,
243 did: row.did,
244 })
245 .into_response()
246}
247
248pub async fn get_session(
249 State(state): State<AppState>,
250 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
251) -> Response {
252 let permissions = auth_user.permissions();
253 let can_read_email = permissions.allows_email_read();
254
255 match sqlx::query!(
256 r#"SELECT
257 handle, email, email_verified, is_admin, deactivated_at, preferred_locale,
258 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
259 discord_verified, telegram_verified, signal_verified
260 FROM users WHERE did = $1"#,
261 auth_user.did
262 )
263 .fetch_optional(&state.db)
264 .await
265 {
266 Ok(Some(row)) => {
267 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
268 crate::comms::CommsChannel::Email => ("email", row.email_verified),
269 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
270 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
271 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
272 };
273 let pds_hostname =
274 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
275 let handle = full_handle(&row.handle, &pds_hostname);
276 let is_active = row.deactivated_at.is_none();
277 let email_value = if can_read_email {
278 row.email.clone()
279 } else {
280 None
281 };
282 let email_verified_value = can_read_email && row.email_verified;
283 Json(json!({
284 "handle": handle,
285 "did": auth_user.did,
286 "email": email_value,
287 "emailVerified": email_verified_value,
288 "preferredChannel": preferred_channel,
289 "preferredChannelVerified": preferred_channel_verified,
290 "preferredLocale": row.preferred_locale,
291 "isAdmin": row.is_admin,
292 "active": is_active,
293 "status": if is_active { "active" } else { "deactivated" },
294 "didDoc": {}
295 }))
296 .into_response()
297 }
298 Ok(None) => ApiError::AuthenticationFailed.into_response(),
299 Err(e) => {
300 error!("Database error in get_session: {:?}", e);
301 ApiError::InternalError.into_response()
302 }
303 }
304}
305
306pub async fn delete_session(
307 State(state): State<AppState>,
308 headers: axum::http::HeaderMap,
309) -> Response {
310 let token = match crate::auth::extract_bearer_token_from_header(
311 headers.get("Authorization").and_then(|h| h.to_str().ok()),
312 ) {
313 Some(t) => t,
314 None => return ApiError::AuthenticationRequired.into_response(),
315 };
316 let jti = match crate::auth::get_jti_from_token(&token) {
317 Ok(jti) => jti,
318 Err(_) => return ApiError::AuthenticationFailed.into_response(),
319 };
320 let did = crate::auth::get_did_from_token(&token).ok();
321 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
322 .execute(&state.db)
323 .await
324 {
325 Ok(res) if res.rows_affected() > 0 => {
326 if let Some(did) = did {
327 let session_cache_key = format!("auth:session:{}:{}", did, jti);
328 let _ = state.cache.delete(&session_cache_key).await;
329 }
330 Json(json!({})).into_response()
331 }
332 Ok(_) => ApiError::AuthenticationFailed.into_response(),
333 Err(e) => {
334 error!("Database error in delete_session: {:?}", e);
335 ApiError::AuthenticationFailed.into_response()
336 }
337 }
338}
339
340pub async fn refresh_session(
341 State(state): State<AppState>,
342 headers: axum::http::HeaderMap,
343) -> Response {
344 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
345 if !state
346 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
347 .await
348 {
349 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
350 return (
351 axum::http::StatusCode::TOO_MANY_REQUESTS,
352 axum::Json(serde_json::json!({
353 "error": "RateLimitExceeded",
354 "message": "Too many requests. Please try again later."
355 })),
356 )
357 .into_response();
358 }
359 let refresh_token = match crate::auth::extract_bearer_token_from_header(
360 headers.get("Authorization").and_then(|h| h.to_str().ok()),
361 ) {
362 Some(t) => t,
363 None => return ApiError::AuthenticationRequired.into_response(),
364 };
365 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
366 Ok(jti) => jti,
367 Err(_) => {
368 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
369 .into_response();
370 }
371 };
372 let mut tx = match state.db.begin().await {
373 Ok(tx) => tx,
374 Err(e) => {
375 error!("Failed to begin transaction: {:?}", e);
376 return ApiError::InternalError.into_response();
377 }
378 };
379 if let Ok(Some(session_id)) = sqlx::query_scalar!(
380 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
381 refresh_jti
382 )
383 .fetch_optional(&mut *tx)
384 .await
385 {
386 warn!(
387 "Refresh token reuse detected! Revoking token family for session_id: {}",
388 session_id
389 );
390 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
391 .execute(&mut *tx)
392 .await;
393 let _ = tx.commit().await;
394 return ApiError::ExpiredTokenMsg(
395 "Refresh token has been revoked due to suspected compromise".into(),
396 )
397 .into_response();
398 }
399 let session_row = match sqlx::query!(
400 r#"SELECT st.id, st.did, st.scope, k.key_bytes, k.encryption_version
401 FROM session_tokens st
402 JOIN users u ON st.did = u.did
403 JOIN user_keys k ON u.id = k.user_id
404 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
405 FOR UPDATE OF st"#,
406 refresh_jti
407 )
408 .fetch_optional(&mut *tx)
409 .await
410 {
411 Ok(Some(row)) => row,
412 Ok(None) => {
413 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
414 .into_response();
415 }
416 Err(e) => {
417 error!("Database error fetching session: {:?}", e);
418 return ApiError::InternalError.into_response();
419 }
420 };
421 let key_bytes =
422 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
423 Ok(k) => k,
424 Err(e) => {
425 error!("Failed to decrypt user key: {:?}", e);
426 return ApiError::InternalError.into_response();
427 }
428 };
429 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
430 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
431 }
432 let new_access_meta = match crate::auth::create_access_token_with_scope_metadata(
433 &session_row.did,
434 &key_bytes,
435 session_row.scope.as_deref(),
436 ) {
437 Ok(m) => m,
438 Err(e) => {
439 error!("Failed to create access token: {:?}", e);
440 return ApiError::InternalError.into_response();
441 }
442 };
443 let new_refresh_meta =
444 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
445 Ok(m) => m,
446 Err(e) => {
447 error!("Failed to create refresh token: {:?}", e);
448 return ApiError::InternalError.into_response();
449 }
450 };
451 match sqlx::query!(
452 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
453 refresh_jti,
454 session_row.id
455 )
456 .execute(&mut *tx)
457 .await
458 {
459 Ok(result) if result.rows_affected() == 0 => {
460 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
461 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
462 .execute(&mut *tx)
463 .await;
464 let _ = tx.commit().await;
465 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
466 }
467 Err(e) => {
468 error!("Failed to record used refresh token: {:?}", e);
469 return ApiError::InternalError.into_response();
470 }
471 Ok(_) => {}
472 }
473 if let Err(e) = sqlx::query!(
474 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
475 new_access_meta.jti,
476 new_refresh_meta.jti,
477 new_access_meta.expires_at,
478 new_refresh_meta.expires_at,
479 session_row.id
480 )
481 .execute(&mut *tx)
482 .await
483 {
484 error!("Database error updating session: {:?}", e);
485 return ApiError::InternalError.into_response();
486 }
487 if let Err(e) = tx.commit().await {
488 error!("Failed to commit transaction: {:?}", e);
489 return ApiError::InternalError.into_response();
490 }
491 match sqlx::query!(
492 r#"SELECT
493 handle, email, email_verified, is_admin, preferred_locale,
494 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
495 discord_verified, telegram_verified, signal_verified
496 FROM users WHERE did = $1"#,
497 session_row.did
498 )
499 .fetch_optional(&state.db)
500 .await
501 {
502 Ok(Some(u)) => {
503 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
504 crate::comms::CommsChannel::Email => ("email", u.email_verified),
505 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
506 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
507 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
508 };
509 let pds_hostname =
510 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
511 let handle = full_handle(&u.handle, &pds_hostname);
512 Json(json!({
513 "accessJwt": new_access_meta.token,
514 "refreshJwt": new_refresh_meta.token,
515 "handle": handle,
516 "did": session_row.did,
517 "email": u.email,
518 "emailVerified": u.email_verified,
519 "preferredChannel": preferred_channel,
520 "preferredChannelVerified": preferred_channel_verified,
521 "preferredLocale": u.preferred_locale,
522 "isAdmin": u.is_admin,
523 "active": true
524 }))
525 .into_response()
526 }
527 Ok(None) => {
528 error!("User not found for existing session: {}", session_row.did);
529 ApiError::InternalError.into_response()
530 }
531 Err(e) => {
532 error!("Database error fetching user: {:?}", e);
533 ApiError::InternalError.into_response()
534 }
535 }
536}
537
538#[derive(Deserialize)]
539#[serde(rename_all = "camelCase")]
540pub struct ConfirmSignupInput {
541 pub did: String,
542 pub verification_code: String,
543}
544
545#[derive(Serialize)]
546#[serde(rename_all = "camelCase")]
547pub struct ConfirmSignupOutput {
548 pub access_jwt: String,
549 pub refresh_jwt: String,
550 pub handle: String,
551 pub did: String,
552 pub email: Option<String>,
553 pub email_verified: bool,
554 pub preferred_channel: String,
555 pub preferred_channel_verified: bool,
556}
557
558pub async fn confirm_signup(
559 State(state): State<AppState>,
560 Json(input): Json<ConfirmSignupInput>,
561) -> Response {
562 info!("confirm_signup called for DID: {}", input.did);
563 let row = match sqlx::query!(
564 r#"SELECT
565 u.id, u.did, u.handle, u.email,
566 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
567 u.discord_id, u.telegram_username, u.signal_number,
568 k.key_bytes, k.encryption_version
569 FROM users u
570 JOIN user_keys k ON u.id = k.user_id
571 WHERE u.did = $1"#,
572 input.did
573 )
574 .fetch_optional(&state.db)
575 .await
576 {
577 Ok(Some(row)) => row,
578 Ok(None) => {
579 warn!("User not found for confirm_signup: {}", input.did);
580 return ApiError::InvalidRequest("Invalid DID or verification code".into())
581 .into_response();
582 }
583 Err(e) => {
584 error!("Database error in confirm_signup: {:?}", e);
585 return ApiError::InternalError.into_response();
586 }
587 };
588
589 let (channel_str, identifier) = match row.channel {
590 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
591 crate::comms::CommsChannel::Discord => {
592 ("discord", row.discord_id.clone().unwrap_or_default())
593 }
594 crate::comms::CommsChannel::Telegram => (
595 "telegram",
596 row.telegram_username.clone().unwrap_or_default(),
597 ),
598 crate::comms::CommsChannel::Signal => {
599 ("signal", row.signal_number.clone().unwrap_or_default())
600 }
601 };
602
603 let normalized_token =
604 crate::auth::verification_token::normalize_token_input(&input.verification_code);
605 match crate::auth::verification_token::verify_signup_token(
606 &normalized_token,
607 channel_str,
608 &identifier,
609 ) {
610 Ok(token_data) => {
611 if token_data.did != input.did {
612 warn!(
613 "Token DID mismatch for confirm_signup: expected {}, got {}",
614 input.did, token_data.did
615 );
616 return ApiError::InvalidRequest("Invalid verification code".into())
617 .into_response();
618 }
619 }
620 Err(crate::auth::verification_token::VerifyError::Expired) => {
621 warn!("Verification code expired for user: {}", input.did);
622 return ApiError::ExpiredTokenMsg("Verification code has expired".into())
623 .into_response();
624 }
625 Err(e) => {
626 warn!("Invalid verification code for user {}: {:?}", input.did, e);
627 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
628 }
629 }
630
631 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
632 Ok(k) => k,
633 Err(e) => {
634 error!("Failed to decrypt user key: {:?}", e);
635 return ApiError::InternalError.into_response();
636 }
637 };
638 let verified_column = match row.channel {
639 crate::comms::CommsChannel::Email => "email_verified",
640 crate::comms::CommsChannel::Discord => "discord_verified",
641 crate::comms::CommsChannel::Telegram => "telegram_verified",
642 crate::comms::CommsChannel::Signal => "signal_verified",
643 };
644 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
645 if let Err(e) = sqlx::query(&update_query)
646 .bind(&input.did)
647 .execute(&state.db)
648 .await
649 {
650 error!("Failed to update verification status: {:?}", e);
651 return ApiError::InternalError.into_response();
652 }
653
654 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
655 Ok(m) => m,
656 Err(e) => {
657 error!("Failed to create access token: {:?}", e);
658 return ApiError::InternalError.into_response();
659 }
660 };
661 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
662 Ok(m) => m,
663 Err(e) => {
664 error!("Failed to create refresh token: {:?}", e);
665 return ApiError::InternalError.into_response();
666 }
667 };
668 let no_scope: Option<String> = None;
669 if let Err(e) = sqlx::query!(
670 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
671 row.did,
672 access_meta.jti,
673 refresh_meta.jti,
674 access_meta.expires_at,
675 refresh_meta.expires_at,
676 false,
677 false,
678 no_scope
679 )
680 .execute(&state.db)
681 .await
682 {
683 error!("Failed to insert session: {:?}", e);
684 return ApiError::InternalError.into_response();
685 }
686 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
687 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
688 warn!("Failed to enqueue welcome notification: {:?}", e);
689 }
690 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
691 let preferred_channel = match row.channel {
692 crate::comms::CommsChannel::Email => "email",
693 crate::comms::CommsChannel::Discord => "discord",
694 crate::comms::CommsChannel::Telegram => "telegram",
695 crate::comms::CommsChannel::Signal => "signal",
696 };
697 Json(ConfirmSignupOutput {
698 access_jwt: access_meta.token,
699 refresh_jwt: refresh_meta.token,
700 handle: row.handle,
701 did: row.did,
702 email: row.email,
703 email_verified,
704 preferred_channel: preferred_channel.to_string(),
705 preferred_channel_verified: true,
706 })
707 .into_response()
708}
709
710#[derive(Deserialize)]
711#[serde(rename_all = "camelCase")]
712pub struct ResendVerificationInput {
713 pub did: String,
714}
715
716pub async fn resend_verification(
717 State(state): State<AppState>,
718 Json(input): Json<ResendVerificationInput>,
719) -> Response {
720 info!("resend_verification called for DID: {}", input.did);
721 let row = match sqlx::query!(
722 r#"SELECT
723 id, handle, email,
724 preferred_comms_channel as "channel: crate::comms::CommsChannel",
725 discord_id, telegram_username, signal_number,
726 email_verified, discord_verified, telegram_verified, signal_verified
727 FROM users
728 WHERE did = $1"#,
729 input.did
730 )
731 .fetch_optional(&state.db)
732 .await
733 {
734 Ok(Some(row)) => row,
735 Ok(None) => {
736 return ApiError::InvalidRequest("User not found".into()).into_response();
737 }
738 Err(e) => {
739 error!("Database error in resend_verification: {:?}", e);
740 return ApiError::InternalError.into_response();
741 }
742 };
743 let is_verified =
744 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
745 if is_verified {
746 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
747 }
748
749 let (channel_str, recipient) = match row.channel {
750 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
751 crate::comms::CommsChannel::Discord => {
752 ("discord", row.discord_id.clone().unwrap_or_default())
753 }
754 crate::comms::CommsChannel::Telegram => (
755 "telegram",
756 row.telegram_username.clone().unwrap_or_default(),
757 ),
758 crate::comms::CommsChannel::Signal => {
759 ("signal", row.signal_number.clone().unwrap_or_default())
760 }
761 };
762
763 let verification_token =
764 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
765 let formatted_token =
766 crate::auth::verification_token::format_token_for_display(&verification_token);
767
768 if let Err(e) = crate::comms::enqueue_signup_verification(
769 &state.db,
770 row.id,
771 channel_str,
772 &recipient,
773 &formatted_token,
774 None,
775 )
776 .await
777 {
778 warn!("Failed to enqueue verification notification: {:?}", e);
779 }
780 Json(json!({"success": true})).into_response()
781}
782
783#[derive(Serialize)]
784#[serde(rename_all = "camelCase")]
785pub struct SessionInfo {
786 pub id: String,
787 pub session_type: String,
788 pub client_name: Option<String>,
789 pub created_at: String,
790 pub expires_at: String,
791 pub is_current: bool,
792}
793
794#[derive(Serialize)]
795#[serde(rename_all = "camelCase")]
796pub struct ListSessionsOutput {
797 pub sessions: Vec<SessionInfo>,
798}
799
800pub async fn list_sessions(
801 State(state): State<AppState>,
802 headers: HeaderMap,
803 auth: BearerAuth,
804) -> Response {
805 let current_jti = headers
806 .get("authorization")
807 .and_then(|v| v.to_str().ok())
808 .and_then(|v| v.strip_prefix("Bearer "))
809 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
810
811 let mut sessions: Vec<SessionInfo> = Vec::new();
812
813 let jwt_result = sqlx::query_as::<
814 _,
815 (
816 i32,
817 String,
818 chrono::DateTime<chrono::Utc>,
819 chrono::DateTime<chrono::Utc>,
820 ),
821 >(
822 r#"
823 SELECT id, access_jti, created_at, refresh_expires_at
824 FROM session_tokens
825 WHERE did = $1 AND refresh_expires_at > NOW()
826 ORDER BY created_at DESC
827 "#,
828 )
829 .bind(&auth.0.did)
830 .fetch_all(&state.db)
831 .await;
832
833 match jwt_result {
834 Ok(rows) => {
835 for (id, access_jti, created_at, expires_at) in rows {
836 sessions.push(SessionInfo {
837 id: format!("jwt:{}", id),
838 session_type: "legacy".to_string(),
839 client_name: None,
840 created_at: created_at.to_rfc3339(),
841 expires_at: expires_at.to_rfc3339(),
842 is_current: current_jti.as_ref() == Some(&access_jti),
843 });
844 }
845 }
846 Err(e) => {
847 error!("DB error fetching JWT sessions: {:?}", e);
848 return (
849 StatusCode::INTERNAL_SERVER_ERROR,
850 Json(json!({"error": "InternalError"})),
851 )
852 .into_response();
853 }
854 }
855
856 let oauth_result = sqlx::query_as::<
857 _,
858 (
859 i32,
860 String,
861 chrono::DateTime<chrono::Utc>,
862 chrono::DateTime<chrono::Utc>,
863 String,
864 ),
865 >(
866 r#"
867 SELECT id, token_id, created_at, expires_at, client_id
868 FROM oauth_token
869 WHERE did = $1 AND expires_at > NOW()
870 ORDER BY created_at DESC
871 "#,
872 )
873 .bind(&auth.0.did)
874 .fetch_all(&state.db)
875 .await;
876
877 match oauth_result {
878 Ok(rows) => {
879 for (id, token_id, created_at, expires_at, client_id) in rows {
880 let client_name = extract_client_name(&client_id);
881 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id);
882 sessions.push(SessionInfo {
883 id: format!("oauth:{}", id),
884 session_type: "oauth".to_string(),
885 client_name: Some(client_name),
886 created_at: created_at.to_rfc3339(),
887 expires_at: expires_at.to_rfc3339(),
888 is_current: is_current_oauth,
889 });
890 }
891 }
892 Err(e) => {
893 error!("DB error fetching OAuth sessions: {:?}", e);
894 return (
895 StatusCode::INTERNAL_SERVER_ERROR,
896 Json(json!({"error": "InternalError"})),
897 )
898 .into_response();
899 }
900 }
901
902 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
903
904 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
905}
906
907fn extract_client_name(client_id: &str) -> String {
908 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
909 "Localhost App".to_string()
910 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
911 parsed.host_str().unwrap_or("Unknown App").to_string()
912 } else {
913 client_id.to_string()
914 }
915}
916
917#[derive(Deserialize)]
918#[serde(rename_all = "camelCase")]
919pub struct RevokeSessionInput {
920 pub session_id: String,
921}
922
923pub async fn revoke_session(
924 State(state): State<AppState>,
925 auth: BearerAuth,
926 Json(input): Json<RevokeSessionInput>,
927) -> Response {
928 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
929 let session_id: i32 = match jwt_id.parse() {
930 Ok(id) => id,
931 Err(_) => {
932 return (
933 StatusCode::BAD_REQUEST,
934 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
935 )
936 .into_response();
937 }
938 };
939 let session = sqlx::query_as::<_, (String,)>(
940 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
941 )
942 .bind(session_id)
943 .bind(&auth.0.did)
944 .fetch_optional(&state.db)
945 .await;
946 let access_jti = match session {
947 Ok(Some((jti,))) => jti,
948 Ok(None) => {
949 return (
950 StatusCode::NOT_FOUND,
951 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
952 )
953 .into_response();
954 }
955 Err(e) => {
956 error!("DB error in revoke_session: {:?}", e);
957 return (
958 StatusCode::INTERNAL_SERVER_ERROR,
959 Json(json!({"error": "InternalError"})),
960 )
961 .into_response();
962 }
963 };
964 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
965 .bind(session_id)
966 .execute(&state.db)
967 .await
968 {
969 error!("DB error deleting session: {:?}", e);
970 return (
971 StatusCode::INTERNAL_SERVER_ERROR,
972 Json(json!({"error": "InternalError"})),
973 )
974 .into_response();
975 }
976 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
977 if let Err(e) = state.cache.delete(&cache_key).await {
978 warn!("Failed to invalidate session cache: {:?}", e);
979 }
980 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
981 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
982 let session_id: i32 = match oauth_id.parse() {
983 Ok(id) => id,
984 Err(_) => {
985 return (
986 StatusCode::BAD_REQUEST,
987 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
988 )
989 .into_response();
990 }
991 };
992 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
993 .bind(session_id)
994 .bind(&auth.0.did)
995 .execute(&state.db)
996 .await;
997 match result {
998 Ok(r) if r.rows_affected() == 0 => {
999 return (
1000 StatusCode::NOT_FOUND,
1001 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1002 )
1003 .into_response();
1004 }
1005 Err(e) => {
1006 error!("DB error deleting OAuth session: {:?}", e);
1007 return (
1008 StatusCode::INTERNAL_SERVER_ERROR,
1009 Json(json!({"error": "InternalError"})),
1010 )
1011 .into_response();
1012 }
1013 _ => {}
1014 }
1015 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1016 } else {
1017 return (
1018 StatusCode::BAD_REQUEST,
1019 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1020 )
1021 .into_response();
1022 }
1023 (StatusCode::OK, Json(json!({}))).into_response()
1024}
1025
1026pub async fn revoke_all_sessions(
1027 State(state): State<AppState>,
1028 headers: HeaderMap,
1029 auth: BearerAuth,
1030) -> Response {
1031 let current_jti = headers
1032 .get("authorization")
1033 .and_then(|v| v.to_str().ok())
1034 .and_then(|v| v.strip_prefix("Bearer "))
1035 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1036
1037 if let Some(ref jti) = current_jti {
1038 if auth.0.is_oauth {
1039 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1040 .bind(&auth.0.did)
1041 .execute(&state.db)
1042 .await
1043 {
1044 error!("DB error revoking JWT sessions: {:?}", e);
1045 return (
1046 StatusCode::INTERNAL_SERVER_ERROR,
1047 Json(json!({"error": "InternalError"})),
1048 )
1049 .into_response();
1050 }
1051 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1052 .bind(&auth.0.did)
1053 .bind(jti)
1054 .execute(&state.db)
1055 .await
1056 {
1057 error!("DB error revoking OAuth sessions: {:?}", e);
1058 return (
1059 StatusCode::INTERNAL_SERVER_ERROR,
1060 Json(json!({"error": "InternalError"})),
1061 )
1062 .into_response();
1063 }
1064 } else {
1065 if let Err(e) =
1066 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1067 .bind(&auth.0.did)
1068 .bind(jti)
1069 .execute(&state.db)
1070 .await
1071 {
1072 error!("DB error revoking JWT sessions: {:?}", e);
1073 return (
1074 StatusCode::INTERNAL_SERVER_ERROR,
1075 Json(json!({"error": "InternalError"})),
1076 )
1077 .into_response();
1078 }
1079 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1080 .bind(&auth.0.did)
1081 .execute(&state.db)
1082 .await
1083 {
1084 error!("DB error revoking OAuth sessions: {:?}", e);
1085 return (
1086 StatusCode::INTERNAL_SERVER_ERROR,
1087 Json(json!({"error": "InternalError"})),
1088 )
1089 .into_response();
1090 }
1091 }
1092 } else {
1093 return (
1094 StatusCode::BAD_REQUEST,
1095 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1096 )
1097 .into_response();
1098 }
1099
1100 info!(did = %auth.0.did, "All other sessions revoked");
1101 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1102}
1103
1104#[derive(Serialize)]
1105#[serde(rename_all = "camelCase")]
1106pub struct LegacyLoginPreferenceOutput {
1107 pub allow_legacy_login: bool,
1108 pub has_mfa: bool,
1109}
1110
1111pub async fn get_legacy_login_preference(
1112 State(state): State<AppState>,
1113 auth: BearerAuth,
1114) -> Response {
1115 let result = sqlx::query!(
1116 r#"SELECT
1117 u.allow_legacy_login,
1118 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1119 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1120 FROM users u WHERE u.did = $1"#,
1121 auth.0.did
1122 )
1123 .fetch_optional(&state.db)
1124 .await;
1125
1126 match result {
1127 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1128 allow_legacy_login: row.allow_legacy_login,
1129 has_mfa: row.has_mfa,
1130 })
1131 .into_response(),
1132 Ok(None) => (
1133 StatusCode::NOT_FOUND,
1134 Json(json!({"error": "AccountNotFound"})),
1135 )
1136 .into_response(),
1137 Err(e) => {
1138 error!("DB error: {:?}", e);
1139 (
1140 StatusCode::INTERNAL_SERVER_ERROR,
1141 Json(json!({"error": "InternalError"})),
1142 )
1143 .into_response()
1144 }
1145 }
1146}
1147
1148#[derive(Deserialize)]
1149#[serde(rename_all = "camelCase")]
1150pub struct UpdateLegacyLoginInput {
1151 pub allow_legacy_login: bool,
1152}
1153
1154pub async fn update_legacy_login_preference(
1155 State(state): State<AppState>,
1156 auth: BearerAuth,
1157 Json(input): Json<UpdateLegacyLoginInput>,
1158) -> Response {
1159 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1160 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1161 .await;
1162 }
1163
1164 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1165 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1166 }
1167
1168 let result = sqlx::query!(
1169 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1170 input.allow_legacy_login,
1171 auth.0.did
1172 )
1173 .fetch_optional(&state.db)
1174 .await;
1175
1176 match result {
1177 Ok(Some(_)) => {
1178 info!(
1179 did = %auth.0.did,
1180 allow_legacy_login = input.allow_legacy_login,
1181 "Legacy login preference updated"
1182 );
1183 Json(json!({
1184 "allowLegacyLogin": input.allow_legacy_login
1185 }))
1186 .into_response()
1187 }
1188 Ok(None) => (
1189 StatusCode::NOT_FOUND,
1190 Json(json!({"error": "AccountNotFound"})),
1191 )
1192 .into_response(),
1193 Err(e) => {
1194 error!("DB error: {:?}", e);
1195 (
1196 StatusCode::INTERNAL_SERVER_ERROR,
1197 Json(json!({"error": "InternalError"})),
1198 )
1199 .into_response()
1200 }
1201 }
1202}
1203
1204use crate::comms::locale::VALID_LOCALES;
1205
1206#[derive(Deserialize)]
1207#[serde(rename_all = "camelCase")]
1208pub struct UpdateLocaleInput {
1209 pub preferred_locale: String,
1210}
1211
1212pub async fn update_locale(
1213 State(state): State<AppState>,
1214 auth: BearerAuth,
1215 Json(input): Json<UpdateLocaleInput>,
1216) -> Response {
1217 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1218 return (
1219 StatusCode::BAD_REQUEST,
1220 Json(json!({
1221 "error": "InvalidRequest",
1222 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", "))
1223 })),
1224 )
1225 .into_response();
1226 }
1227
1228 let result = sqlx::query!(
1229 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1230 input.preferred_locale,
1231 auth.0.did
1232 )
1233 .fetch_optional(&state.db)
1234 .await;
1235
1236 match result {
1237 Ok(Some(_)) => {
1238 info!(
1239 did = %auth.0.did,
1240 locale = %input.preferred_locale,
1241 "User locale preference updated"
1242 );
1243 Json(json!({
1244 "preferredLocale": input.preferred_locale
1245 }))
1246 .into_response()
1247 }
1248 Ok(None) => (
1249 StatusCode::NOT_FOUND,
1250 Json(json!({"error": "AccountNotFound"})),
1251 )
1252 .into_response(),
1253 Err(e) => {
1254 error!("DB error updating locale: {:?}", e);
1255 (
1256 StatusCode::INTERNAL_SERVER_ERROR,
1257 Json(json!({"error": "InternalError"})),
1258 )
1259 .into_response()
1260 }
1261 }
1262}