this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14
15fn extract_client_ip(headers: &HeaderMap) -> String {
16 if let Some(forwarded) = headers.get("x-forwarded-for")
17 && let Ok(value) = forwarded.to_str()
18 && let Some(first_ip) = value.split(',').next()
19 {
20 return first_ip.trim().to_string();
21 }
22 if let Some(real_ip) = headers.get("x-real-ip")
23 && let Ok(value) = real_ip.to_str()
24 {
25 return value.trim().to_string();
26 }
27 "unknown".to_string()
28}
29
30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
31 let identifier = identifier.trim();
32 if identifier.contains('@') || identifier.starts_with("did:") {
33 identifier.to_string()
34 } else if !identifier.contains('.') {
35 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
36 } else {
37 identifier.to_lowercase()
38 }
39}
40
41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
42 stored_handle.to_string()
43}
44
45#[derive(Deserialize)]
46pub struct CreateSessionInput {
47 pub identifier: String,
48 pub password: String,
49}
50
51#[derive(Serialize)]
52#[serde(rename_all = "camelCase")]
53pub struct CreateSessionOutput {
54 pub access_jwt: String,
55 pub refresh_jwt: String,
56 pub handle: String,
57 pub did: String,
58}
59
60pub async fn create_session(
61 State(state): State<AppState>,
62 headers: HeaderMap,
63 Json(input): Json<CreateSessionInput>,
64) -> Response {
65 info!(
66 "create_session called with identifier: {}",
67 input.identifier
68 );
69 let client_ip = extract_client_ip(&headers);
70 if !state
71 .check_rate_limit(RateLimitKind::Login, &client_ip)
72 .await
73 {
74 warn!(ip = %client_ip, "Login rate limit exceeded");
75 return (
76 StatusCode::TOO_MANY_REQUESTS,
77 Json(json!({
78 "error": "RateLimitExceeded",
79 "message": "Too many login attempts. Please try again later."
80 })),
81 )
82 .into_response();
83 }
84 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
85 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
86 info!(
87 "Normalized identifier: {} -> {}",
88 input.identifier, normalized_identifier
89 );
90 let row = match sqlx::query!(
91 r#"SELECT
92 u.id, u.did, u.handle, u.password_hash,
93 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
94 u.allow_legacy_login,
95 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
96 k.key_bytes, k.encryption_version,
97 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
98 FROM users u
99 JOIN user_keys k ON u.id = k.user_id
100 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
101 normalized_identifier
102 )
103 .fetch_optional(&state.db)
104 .await
105 {
106 Ok(Some(row)) => row,
107 Ok(None) => {
108 let _ = verify(
109 &input.password,
110 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
111 );
112 warn!("User not found for login attempt");
113 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
114 .into_response();
115 }
116 Err(e) => {
117 error!("Database error fetching user: {:?}", e);
118 return ApiError::InternalError.into_response();
119 }
120 };
121 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
122 Ok(k) => k,
123 Err(e) => {
124 error!("Failed to decrypt user key: {:?}", e);
125 return ApiError::InternalError.into_response();
126 }
127 };
128 let password_valid = if row
129 .password_hash
130 .as_ref()
131 .map(|h| verify(&input.password, h).unwrap_or(false))
132 .unwrap_or(false)
133 {
134 true
135 } else {
136 let app_passwords = sqlx::query!(
137 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
138 row.id
139 )
140 .fetch_all(&state.db)
141 .await
142 .unwrap_or_default();
143 app_passwords
144 .iter()
145 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false))
146 };
147 if !password_valid {
148 warn!("Password verification failed for login attempt");
149 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
150 .into_response();
151 }
152 let is_verified =
153 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
154 if !is_verified {
155 warn!("Login attempt for unverified account: {}", row.did);
156 return (
157 StatusCode::FORBIDDEN,
158 Json(json!({
159 "error": "AccountNotVerified",
160 "message": "Please verify your account before logging in",
161 "did": row.did
162 })),
163 )
164 .into_response();
165 }
166 let has_totp = row.totp_enabled.unwrap_or(false);
167 let is_legacy_login = has_totp;
168 if has_totp && !row.allow_legacy_login {
169 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
170 return (
171 StatusCode::FORBIDDEN,
172 Json(json!({
173 "error": "MfaRequired",
174 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
175 "did": row.did
176 })),
177 )
178 .into_response();
179 }
180 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
181 Ok(m) => m,
182 Err(e) => {
183 error!("Failed to create access token: {:?}", e);
184 return ApiError::InternalError.into_response();
185 }
186 };
187 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
188 Ok(m) => m,
189 Err(e) => {
190 error!("Failed to create refresh token: {:?}", e);
191 return ApiError::InternalError.into_response();
192 }
193 };
194 if let Err(e) = sqlx::query!(
195 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)",
196 row.did,
197 access_meta.jti,
198 refresh_meta.jti,
199 access_meta.expires_at,
200 refresh_meta.expires_at,
201 is_legacy_login,
202 false
203 )
204 .execute(&state.db)
205 .await
206 {
207 error!("Failed to insert session: {:?}", e);
208 return ApiError::InternalError.into_response();
209 }
210 if is_legacy_login {
211 warn!(
212 did = %row.did,
213 ip = %client_ip,
214 "Legacy login on TOTP-enabled account - sending notification"
215 );
216 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
217 if let Err(e) = crate::comms::queue_legacy_login_notification(
218 &state.db,
219 row.id,
220 &hostname,
221 &client_ip,
222 row.preferred_comms_channel,
223 )
224 .await
225 {
226 error!("Failed to queue legacy login notification: {:?}", e);
227 }
228 }
229 let handle = full_handle(&row.handle, &pds_hostname);
230 Json(CreateSessionOutput {
231 access_jwt: access_meta.token,
232 refresh_jwt: refresh_meta.token,
233 handle,
234 did: row.did,
235 })
236 .into_response()
237}
238
239pub async fn get_session(
240 State(state): State<AppState>,
241 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
242) -> Response {
243 let permissions = auth_user.permissions();
244 let can_read_email = permissions.allows_email_read();
245
246 match sqlx::query!(
247 r#"SELECT
248 handle, email, email_verified, is_admin, deactivated_at, preferred_locale,
249 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
250 discord_verified, telegram_verified, signal_verified
251 FROM users WHERE did = $1"#,
252 auth_user.did
253 )
254 .fetch_optional(&state.db)
255 .await
256 {
257 Ok(Some(row)) => {
258 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
259 crate::comms::CommsChannel::Email => ("email", row.email_verified),
260 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
261 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
262 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
263 };
264 let pds_hostname =
265 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
266 let handle = full_handle(&row.handle, &pds_hostname);
267 let is_active = row.deactivated_at.is_none();
268 let email_value = if can_read_email {
269 row.email.clone()
270 } else {
271 None
272 };
273 let email_verified_value = can_read_email && row.email_verified;
274 Json(json!({
275 "handle": handle,
276 "did": auth_user.did,
277 "email": email_value,
278 "emailVerified": email_verified_value,
279 "preferredChannel": preferred_channel,
280 "preferredChannelVerified": preferred_channel_verified,
281 "preferredLocale": row.preferred_locale,
282 "isAdmin": row.is_admin,
283 "active": is_active,
284 "status": if is_active { "active" } else { "deactivated" },
285 "didDoc": {}
286 }))
287 .into_response()
288 }
289 Ok(None) => ApiError::AuthenticationFailed.into_response(),
290 Err(e) => {
291 error!("Database error in get_session: {:?}", e);
292 ApiError::InternalError.into_response()
293 }
294 }
295}
296
297pub async fn delete_session(
298 State(state): State<AppState>,
299 headers: axum::http::HeaderMap,
300) -> Response {
301 let token = match crate::auth::extract_bearer_token_from_header(
302 headers.get("Authorization").and_then(|h| h.to_str().ok()),
303 ) {
304 Some(t) => t,
305 None => return ApiError::AuthenticationRequired.into_response(),
306 };
307 let jti = match crate::auth::get_jti_from_token(&token) {
308 Ok(jti) => jti,
309 Err(_) => return ApiError::AuthenticationFailed.into_response(),
310 };
311 let did = crate::auth::get_did_from_token(&token).ok();
312 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
313 .execute(&state.db)
314 .await
315 {
316 Ok(res) if res.rows_affected() > 0 => {
317 if let Some(did) = did {
318 let session_cache_key = format!("auth:session:{}:{}", did, jti);
319 let _ = state.cache.delete(&session_cache_key).await;
320 }
321 Json(json!({})).into_response()
322 }
323 Ok(_) => ApiError::AuthenticationFailed.into_response(),
324 Err(e) => {
325 error!("Database error in delete_session: {:?}", e);
326 ApiError::AuthenticationFailed.into_response()
327 }
328 }
329}
330
331pub async fn refresh_session(
332 State(state): State<AppState>,
333 headers: axum::http::HeaderMap,
334) -> Response {
335 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
336 if !state
337 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
338 .await
339 {
340 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
341 return (
342 axum::http::StatusCode::TOO_MANY_REQUESTS,
343 axum::Json(serde_json::json!({
344 "error": "RateLimitExceeded",
345 "message": "Too many requests. Please try again later."
346 })),
347 )
348 .into_response();
349 }
350 let refresh_token = match crate::auth::extract_bearer_token_from_header(
351 headers.get("Authorization").and_then(|h| h.to_str().ok()),
352 ) {
353 Some(t) => t,
354 None => return ApiError::AuthenticationRequired.into_response(),
355 };
356 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
357 Ok(jti) => jti,
358 Err(_) => {
359 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
360 .into_response();
361 }
362 };
363 let mut tx = match state.db.begin().await {
364 Ok(tx) => tx,
365 Err(e) => {
366 error!("Failed to begin transaction: {:?}", e);
367 return ApiError::InternalError.into_response();
368 }
369 };
370 if let Ok(Some(session_id)) = sqlx::query_scalar!(
371 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
372 refresh_jti
373 )
374 .fetch_optional(&mut *tx)
375 .await
376 {
377 warn!(
378 "Refresh token reuse detected! Revoking token family for session_id: {}",
379 session_id
380 );
381 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
382 .execute(&mut *tx)
383 .await;
384 let _ = tx.commit().await;
385 return ApiError::ExpiredTokenMsg(
386 "Refresh token has been revoked due to suspected compromise".into(),
387 )
388 .into_response();
389 }
390 let session_row = match sqlx::query!(
391 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
392 FROM session_tokens st
393 JOIN users u ON st.did = u.did
394 JOIN user_keys k ON u.id = k.user_id
395 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
396 FOR UPDATE OF st"#,
397 refresh_jti
398 )
399 .fetch_optional(&mut *tx)
400 .await
401 {
402 Ok(Some(row)) => row,
403 Ok(None) => {
404 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
405 .into_response();
406 }
407 Err(e) => {
408 error!("Database error fetching session: {:?}", e);
409 return ApiError::InternalError.into_response();
410 }
411 };
412 let key_bytes =
413 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
414 Ok(k) => k,
415 Err(e) => {
416 error!("Failed to decrypt user key: {:?}", e);
417 return ApiError::InternalError.into_response();
418 }
419 };
420 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
421 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
422 }
423 let new_access_meta =
424 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
425 Ok(m) => m,
426 Err(e) => {
427 error!("Failed to create access token: {:?}", e);
428 return ApiError::InternalError.into_response();
429 }
430 };
431 let new_refresh_meta =
432 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
433 Ok(m) => m,
434 Err(e) => {
435 error!("Failed to create refresh token: {:?}", e);
436 return ApiError::InternalError.into_response();
437 }
438 };
439 match sqlx::query!(
440 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
441 refresh_jti,
442 session_row.id
443 )
444 .execute(&mut *tx)
445 .await
446 {
447 Ok(result) if result.rows_affected() == 0 => {
448 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
449 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
450 .execute(&mut *tx)
451 .await;
452 let _ = tx.commit().await;
453 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
454 }
455 Err(e) => {
456 error!("Failed to record used refresh token: {:?}", e);
457 return ApiError::InternalError.into_response();
458 }
459 Ok(_) => {}
460 }
461 if let Err(e) = sqlx::query!(
462 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
463 new_access_meta.jti,
464 new_refresh_meta.jti,
465 new_access_meta.expires_at,
466 new_refresh_meta.expires_at,
467 session_row.id
468 )
469 .execute(&mut *tx)
470 .await
471 {
472 error!("Database error updating session: {:?}", e);
473 return ApiError::InternalError.into_response();
474 }
475 if let Err(e) = tx.commit().await {
476 error!("Failed to commit transaction: {:?}", e);
477 return ApiError::InternalError.into_response();
478 }
479 match sqlx::query!(
480 r#"SELECT
481 handle, email, email_verified, is_admin, preferred_locale,
482 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
483 discord_verified, telegram_verified, signal_verified
484 FROM users WHERE did = $1"#,
485 session_row.did
486 )
487 .fetch_optional(&state.db)
488 .await
489 {
490 Ok(Some(u)) => {
491 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
492 crate::comms::CommsChannel::Email => ("email", u.email_verified),
493 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
494 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
495 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
496 };
497 let pds_hostname =
498 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
499 let handle = full_handle(&u.handle, &pds_hostname);
500 Json(json!({
501 "accessJwt": new_access_meta.token,
502 "refreshJwt": new_refresh_meta.token,
503 "handle": handle,
504 "did": session_row.did,
505 "email": u.email,
506 "emailVerified": u.email_verified,
507 "preferredChannel": preferred_channel,
508 "preferredChannelVerified": preferred_channel_verified,
509 "preferredLocale": u.preferred_locale,
510 "isAdmin": u.is_admin,
511 "active": true
512 }))
513 .into_response()
514 }
515 Ok(None) => {
516 error!("User not found for existing session: {}", session_row.did);
517 ApiError::InternalError.into_response()
518 }
519 Err(e) => {
520 error!("Database error fetching user: {:?}", e);
521 ApiError::InternalError.into_response()
522 }
523 }
524}
525
526#[derive(Deserialize)]
527#[serde(rename_all = "camelCase")]
528pub struct ConfirmSignupInput {
529 pub did: String,
530 pub verification_code: String,
531}
532
533#[derive(Serialize)]
534#[serde(rename_all = "camelCase")]
535pub struct ConfirmSignupOutput {
536 pub access_jwt: String,
537 pub refresh_jwt: String,
538 pub handle: String,
539 pub did: String,
540 pub email: Option<String>,
541 pub email_verified: bool,
542 pub preferred_channel: String,
543 pub preferred_channel_verified: bool,
544}
545
546pub async fn confirm_signup(
547 State(state): State<AppState>,
548 Json(input): Json<ConfirmSignupInput>,
549) -> Response {
550 info!("confirm_signup called for DID: {}", input.did);
551 let row = match sqlx::query!(
552 r#"SELECT
553 u.id, u.did, u.handle, u.email,
554 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
555 u.discord_id, u.telegram_username, u.signal_number,
556 k.key_bytes, k.encryption_version
557 FROM users u
558 JOIN user_keys k ON u.id = k.user_id
559 WHERE u.did = $1"#,
560 input.did
561 )
562 .fetch_optional(&state.db)
563 .await
564 {
565 Ok(Some(row)) => row,
566 Ok(None) => {
567 warn!("User not found for confirm_signup: {}", input.did);
568 return ApiError::InvalidRequest("Invalid DID or verification code".into())
569 .into_response();
570 }
571 Err(e) => {
572 error!("Database error in confirm_signup: {:?}", e);
573 return ApiError::InternalError.into_response();
574 }
575 };
576
577 let (channel_str, identifier) = match row.channel {
578 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
579 crate::comms::CommsChannel::Discord => {
580 ("discord", row.discord_id.clone().unwrap_or_default())
581 }
582 crate::comms::CommsChannel::Telegram => (
583 "telegram",
584 row.telegram_username.clone().unwrap_or_default(),
585 ),
586 crate::comms::CommsChannel::Signal => {
587 ("signal", row.signal_number.clone().unwrap_or_default())
588 }
589 };
590
591 let normalized_token =
592 crate::auth::verification_token::normalize_token_input(&input.verification_code);
593 match crate::auth::verification_token::verify_signup_token(
594 &normalized_token,
595 channel_str,
596 &identifier,
597 ) {
598 Ok(token_data) => {
599 if token_data.did != input.did {
600 warn!(
601 "Token DID mismatch for confirm_signup: expected {}, got {}",
602 input.did, token_data.did
603 );
604 return ApiError::InvalidRequest("Invalid verification code".into())
605 .into_response();
606 }
607 }
608 Err(crate::auth::verification_token::VerifyError::Expired) => {
609 warn!("Verification code expired for user: {}", input.did);
610 return ApiError::ExpiredTokenMsg("Verification code has expired".into())
611 .into_response();
612 }
613 Err(e) => {
614 warn!("Invalid verification code for user {}: {:?}", input.did, e);
615 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
616 }
617 }
618
619 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
620 Ok(k) => k,
621 Err(e) => {
622 error!("Failed to decrypt user key: {:?}", e);
623 return ApiError::InternalError.into_response();
624 }
625 };
626 let verified_column = match row.channel {
627 crate::comms::CommsChannel::Email => "email_verified",
628 crate::comms::CommsChannel::Discord => "discord_verified",
629 crate::comms::CommsChannel::Telegram => "telegram_verified",
630 crate::comms::CommsChannel::Signal => "signal_verified",
631 };
632 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
633 if let Err(e) = sqlx::query(&update_query)
634 .bind(&input.did)
635 .execute(&state.db)
636 .await
637 {
638 error!("Failed to update verification status: {:?}", e);
639 return ApiError::InternalError.into_response();
640 }
641
642 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
643 Ok(m) => m,
644 Err(e) => {
645 error!("Failed to create access token: {:?}", e);
646 return ApiError::InternalError.into_response();
647 }
648 };
649 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
650 Ok(m) => m,
651 Err(e) => {
652 error!("Failed to create refresh token: {:?}", e);
653 return ApiError::InternalError.into_response();
654 }
655 };
656 if let Err(e) = sqlx::query!(
657 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)",
658 row.did,
659 access_meta.jti,
660 refresh_meta.jti,
661 access_meta.expires_at,
662 refresh_meta.expires_at,
663 false,
664 false
665 )
666 .execute(&state.db)
667 .await
668 {
669 error!("Failed to insert session: {:?}", e);
670 return ApiError::InternalError.into_response();
671 }
672 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
673 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
674 warn!("Failed to enqueue welcome notification: {:?}", e);
675 }
676 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
677 let preferred_channel = match row.channel {
678 crate::comms::CommsChannel::Email => "email",
679 crate::comms::CommsChannel::Discord => "discord",
680 crate::comms::CommsChannel::Telegram => "telegram",
681 crate::comms::CommsChannel::Signal => "signal",
682 };
683 Json(ConfirmSignupOutput {
684 access_jwt: access_meta.token,
685 refresh_jwt: refresh_meta.token,
686 handle: row.handle,
687 did: row.did,
688 email: row.email,
689 email_verified,
690 preferred_channel: preferred_channel.to_string(),
691 preferred_channel_verified: true,
692 })
693 .into_response()
694}
695
696#[derive(Deserialize)]
697#[serde(rename_all = "camelCase")]
698pub struct ResendVerificationInput {
699 pub did: String,
700}
701
702pub async fn resend_verification(
703 State(state): State<AppState>,
704 Json(input): Json<ResendVerificationInput>,
705) -> Response {
706 info!("resend_verification called for DID: {}", input.did);
707 let row = match sqlx::query!(
708 r#"SELECT
709 id, handle, email,
710 preferred_comms_channel as "channel: crate::comms::CommsChannel",
711 discord_id, telegram_username, signal_number,
712 email_verified, discord_verified, telegram_verified, signal_verified
713 FROM users
714 WHERE did = $1"#,
715 input.did
716 )
717 .fetch_optional(&state.db)
718 .await
719 {
720 Ok(Some(row)) => row,
721 Ok(None) => {
722 return ApiError::InvalidRequest("User not found".into()).into_response();
723 }
724 Err(e) => {
725 error!("Database error in resend_verification: {:?}", e);
726 return ApiError::InternalError.into_response();
727 }
728 };
729 let is_verified =
730 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
731 if is_verified {
732 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
733 }
734
735 let (channel_str, recipient) = match row.channel {
736 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
737 crate::comms::CommsChannel::Discord => {
738 ("discord", row.discord_id.clone().unwrap_or_default())
739 }
740 crate::comms::CommsChannel::Telegram => (
741 "telegram",
742 row.telegram_username.clone().unwrap_or_default(),
743 ),
744 crate::comms::CommsChannel::Signal => {
745 ("signal", row.signal_number.clone().unwrap_or_default())
746 }
747 };
748
749 let verification_token =
750 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
751 let formatted_token =
752 crate::auth::verification_token::format_token_for_display(&verification_token);
753
754 if let Err(e) = crate::comms::enqueue_signup_verification(
755 &state.db,
756 row.id,
757 channel_str,
758 &recipient,
759 &formatted_token,
760 None,
761 )
762 .await
763 {
764 warn!("Failed to enqueue verification notification: {:?}", e);
765 }
766 Json(json!({"success": true})).into_response()
767}
768
769#[derive(Serialize)]
770#[serde(rename_all = "camelCase")]
771pub struct SessionInfo {
772 pub id: String,
773 pub session_type: String,
774 pub client_name: Option<String>,
775 pub created_at: String,
776 pub expires_at: String,
777 pub is_current: bool,
778}
779
780#[derive(Serialize)]
781#[serde(rename_all = "camelCase")]
782pub struct ListSessionsOutput {
783 pub sessions: Vec<SessionInfo>,
784}
785
786pub async fn list_sessions(
787 State(state): State<AppState>,
788 headers: HeaderMap,
789 auth: BearerAuth,
790) -> Response {
791 let current_jti = headers
792 .get("authorization")
793 .and_then(|v| v.to_str().ok())
794 .and_then(|v| v.strip_prefix("Bearer "))
795 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
796
797 let mut sessions: Vec<SessionInfo> = Vec::new();
798
799 let jwt_result = sqlx::query_as::<
800 _,
801 (
802 i32,
803 String,
804 chrono::DateTime<chrono::Utc>,
805 chrono::DateTime<chrono::Utc>,
806 ),
807 >(
808 r#"
809 SELECT id, access_jti, created_at, refresh_expires_at
810 FROM session_tokens
811 WHERE did = $1 AND refresh_expires_at > NOW()
812 ORDER BY created_at DESC
813 "#,
814 )
815 .bind(&auth.0.did)
816 .fetch_all(&state.db)
817 .await;
818
819 match jwt_result {
820 Ok(rows) => {
821 for (id, access_jti, created_at, expires_at) in rows {
822 sessions.push(SessionInfo {
823 id: format!("jwt:{}", id),
824 session_type: "legacy".to_string(),
825 client_name: None,
826 created_at: created_at.to_rfc3339(),
827 expires_at: expires_at.to_rfc3339(),
828 is_current: current_jti.as_ref() == Some(&access_jti),
829 });
830 }
831 }
832 Err(e) => {
833 error!("DB error fetching JWT sessions: {:?}", e);
834 return (
835 StatusCode::INTERNAL_SERVER_ERROR,
836 Json(json!({"error": "InternalError"})),
837 )
838 .into_response();
839 }
840 }
841
842 let oauth_result = sqlx::query_as::<
843 _,
844 (
845 i32,
846 String,
847 chrono::DateTime<chrono::Utc>,
848 chrono::DateTime<chrono::Utc>,
849 String,
850 ),
851 >(
852 r#"
853 SELECT id, token_id, created_at, expires_at, client_id
854 FROM oauth_token
855 WHERE did = $1 AND expires_at > NOW()
856 ORDER BY created_at DESC
857 "#,
858 )
859 .bind(&auth.0.did)
860 .fetch_all(&state.db)
861 .await;
862
863 match oauth_result {
864 Ok(rows) => {
865 for (id, token_id, created_at, expires_at, client_id) in rows {
866 let client_name = extract_client_name(&client_id);
867 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id);
868 sessions.push(SessionInfo {
869 id: format!("oauth:{}", id),
870 session_type: "oauth".to_string(),
871 client_name: Some(client_name),
872 created_at: created_at.to_rfc3339(),
873 expires_at: expires_at.to_rfc3339(),
874 is_current: is_current_oauth,
875 });
876 }
877 }
878 Err(e) => {
879 error!("DB error fetching OAuth sessions: {:?}", e);
880 return (
881 StatusCode::INTERNAL_SERVER_ERROR,
882 Json(json!({"error": "InternalError"})),
883 )
884 .into_response();
885 }
886 }
887
888 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
889
890 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
891}
892
893fn extract_client_name(client_id: &str) -> String {
894 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
895 "Localhost App".to_string()
896 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
897 parsed.host_str().unwrap_or("Unknown App").to_string()
898 } else {
899 client_id.to_string()
900 }
901}
902
903#[derive(Deserialize)]
904#[serde(rename_all = "camelCase")]
905pub struct RevokeSessionInput {
906 pub session_id: String,
907}
908
909pub async fn revoke_session(
910 State(state): State<AppState>,
911 auth: BearerAuth,
912 Json(input): Json<RevokeSessionInput>,
913) -> Response {
914 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
915 let session_id: i32 = match jwt_id.parse() {
916 Ok(id) => id,
917 Err(_) => {
918 return (
919 StatusCode::BAD_REQUEST,
920 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
921 )
922 .into_response();
923 }
924 };
925 let session = sqlx::query_as::<_, (String,)>(
926 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
927 )
928 .bind(session_id)
929 .bind(&auth.0.did)
930 .fetch_optional(&state.db)
931 .await;
932 let access_jti = match session {
933 Ok(Some((jti,))) => jti,
934 Ok(None) => {
935 return (
936 StatusCode::NOT_FOUND,
937 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
938 )
939 .into_response();
940 }
941 Err(e) => {
942 error!("DB error in revoke_session: {:?}", e);
943 return (
944 StatusCode::INTERNAL_SERVER_ERROR,
945 Json(json!({"error": "InternalError"})),
946 )
947 .into_response();
948 }
949 };
950 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
951 .bind(session_id)
952 .execute(&state.db)
953 .await
954 {
955 error!("DB error deleting session: {:?}", e);
956 return (
957 StatusCode::INTERNAL_SERVER_ERROR,
958 Json(json!({"error": "InternalError"})),
959 )
960 .into_response();
961 }
962 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
963 if let Err(e) = state.cache.delete(&cache_key).await {
964 warn!("Failed to invalidate session cache: {:?}", e);
965 }
966 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
967 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
968 let session_id: i32 = match oauth_id.parse() {
969 Ok(id) => id,
970 Err(_) => {
971 return (
972 StatusCode::BAD_REQUEST,
973 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
974 )
975 .into_response();
976 }
977 };
978 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
979 .bind(session_id)
980 .bind(&auth.0.did)
981 .execute(&state.db)
982 .await;
983 match result {
984 Ok(r) if r.rows_affected() == 0 => {
985 return (
986 StatusCode::NOT_FOUND,
987 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
988 )
989 .into_response();
990 }
991 Err(e) => {
992 error!("DB error deleting OAuth session: {:?}", e);
993 return (
994 StatusCode::INTERNAL_SERVER_ERROR,
995 Json(json!({"error": "InternalError"})),
996 )
997 .into_response();
998 }
999 _ => {}
1000 }
1001 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1002 } else {
1003 return (
1004 StatusCode::BAD_REQUEST,
1005 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1006 )
1007 .into_response();
1008 }
1009 (StatusCode::OK, Json(json!({}))).into_response()
1010}
1011
1012pub async fn revoke_all_sessions(
1013 State(state): State<AppState>,
1014 headers: HeaderMap,
1015 auth: BearerAuth,
1016) -> Response {
1017 let current_jti = headers
1018 .get("authorization")
1019 .and_then(|v| v.to_str().ok())
1020 .and_then(|v| v.strip_prefix("Bearer "))
1021 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1022
1023 if let Some(ref jti) = current_jti {
1024 if auth.0.is_oauth {
1025 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1026 .bind(&auth.0.did)
1027 .execute(&state.db)
1028 .await
1029 {
1030 error!("DB error revoking JWT sessions: {:?}", e);
1031 return (
1032 StatusCode::INTERNAL_SERVER_ERROR,
1033 Json(json!({"error": "InternalError"})),
1034 )
1035 .into_response();
1036 }
1037 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1038 .bind(&auth.0.did)
1039 .bind(jti)
1040 .execute(&state.db)
1041 .await
1042 {
1043 error!("DB error revoking OAuth sessions: {:?}", e);
1044 return (
1045 StatusCode::INTERNAL_SERVER_ERROR,
1046 Json(json!({"error": "InternalError"})),
1047 )
1048 .into_response();
1049 }
1050 } else {
1051 if let Err(e) =
1052 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1053 .bind(&auth.0.did)
1054 .bind(jti)
1055 .execute(&state.db)
1056 .await
1057 {
1058 error!("DB error revoking JWT sessions: {:?}", e);
1059 return (
1060 StatusCode::INTERNAL_SERVER_ERROR,
1061 Json(json!({"error": "InternalError"})),
1062 )
1063 .into_response();
1064 }
1065 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1066 .bind(&auth.0.did)
1067 .execute(&state.db)
1068 .await
1069 {
1070 error!("DB error revoking OAuth sessions: {:?}", e);
1071 return (
1072 StatusCode::INTERNAL_SERVER_ERROR,
1073 Json(json!({"error": "InternalError"})),
1074 )
1075 .into_response();
1076 }
1077 }
1078 } else {
1079 return (
1080 StatusCode::BAD_REQUEST,
1081 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1082 )
1083 .into_response();
1084 }
1085
1086 info!(did = %auth.0.did, "All other sessions revoked");
1087 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1088}
1089
1090#[derive(Serialize)]
1091#[serde(rename_all = "camelCase")]
1092pub struct LegacyLoginPreferenceOutput {
1093 pub allow_legacy_login: bool,
1094 pub has_mfa: bool,
1095}
1096
1097pub async fn get_legacy_login_preference(
1098 State(state): State<AppState>,
1099 auth: BearerAuth,
1100) -> Response {
1101 let result = sqlx::query!(
1102 r#"SELECT
1103 u.allow_legacy_login,
1104 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1105 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1106 FROM users u WHERE u.did = $1"#,
1107 auth.0.did
1108 )
1109 .fetch_optional(&state.db)
1110 .await;
1111
1112 match result {
1113 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1114 allow_legacy_login: row.allow_legacy_login,
1115 has_mfa: row.has_mfa,
1116 })
1117 .into_response(),
1118 Ok(None) => (
1119 StatusCode::NOT_FOUND,
1120 Json(json!({"error": "AccountNotFound"})),
1121 )
1122 .into_response(),
1123 Err(e) => {
1124 error!("DB error: {:?}", e);
1125 (
1126 StatusCode::INTERNAL_SERVER_ERROR,
1127 Json(json!({"error": "InternalError"})),
1128 )
1129 .into_response()
1130 }
1131 }
1132}
1133
1134#[derive(Deserialize)]
1135#[serde(rename_all = "camelCase")]
1136pub struct UpdateLegacyLoginInput {
1137 pub allow_legacy_login: bool,
1138}
1139
1140pub async fn update_legacy_login_preference(
1141 State(state): State<AppState>,
1142 auth: BearerAuth,
1143 Json(input): Json<UpdateLegacyLoginInput>,
1144) -> Response {
1145 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1146 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1147 .await;
1148 }
1149
1150 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1151 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1152 }
1153
1154 let result = sqlx::query!(
1155 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1156 input.allow_legacy_login,
1157 auth.0.did
1158 )
1159 .fetch_optional(&state.db)
1160 .await;
1161
1162 match result {
1163 Ok(Some(_)) => {
1164 info!(
1165 did = %auth.0.did,
1166 allow_legacy_login = input.allow_legacy_login,
1167 "Legacy login preference updated"
1168 );
1169 Json(json!({
1170 "allowLegacyLogin": input.allow_legacy_login
1171 }))
1172 .into_response()
1173 }
1174 Ok(None) => (
1175 StatusCode::NOT_FOUND,
1176 Json(json!({"error": "AccountNotFound"})),
1177 )
1178 .into_response(),
1179 Err(e) => {
1180 error!("DB error: {:?}", e);
1181 (
1182 StatusCode::INTERNAL_SERVER_ERROR,
1183 Json(json!({"error": "InternalError"})),
1184 )
1185 .into_response()
1186 }
1187 }
1188}
1189
1190use crate::comms::locale::VALID_LOCALES;
1191
1192#[derive(Deserialize)]
1193#[serde(rename_all = "camelCase")]
1194pub struct UpdateLocaleInput {
1195 pub preferred_locale: String,
1196}
1197
1198pub async fn update_locale(
1199 State(state): State<AppState>,
1200 auth: BearerAuth,
1201 Json(input): Json<UpdateLocaleInput>,
1202) -> Response {
1203 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1204 return (
1205 StatusCode::BAD_REQUEST,
1206 Json(json!({
1207 "error": "InvalidRequest",
1208 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", "))
1209 })),
1210 )
1211 .into_response();
1212 }
1213
1214 let result = sqlx::query!(
1215 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1216 input.preferred_locale,
1217 auth.0.did
1218 )
1219 .fetch_optional(&state.db)
1220 .await;
1221
1222 match result {
1223 Ok(Some(_)) => {
1224 info!(
1225 did = %auth.0.did,
1226 locale = %input.preferred_locale,
1227 "User locale preference updated"
1228 );
1229 Json(json!({
1230 "preferredLocale": input.preferred_locale
1231 }))
1232 .into_response()
1233 }
1234 Ok(None) => (
1235 StatusCode::NOT_FOUND,
1236 Json(json!({"error": "AccountNotFound"})),
1237 )
1238 .into_response(),
1239 Err(e) => {
1240 error!("DB error updating locale: {:?}", e);
1241 (
1242 StatusCode::INTERNAL_SERVER_ERROR,
1243 Json(json!({"error": "InternalError"})),
1244 )
1245 .into_response()
1246 }
1247 }
1248}