this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tracing::{error, info, warn};
15
16fn extract_client_ip(headers: &HeaderMap) -> String {
17 if let Some(forwarded) = headers.get("x-forwarded-for")
18 && let Ok(value) = forwarded.to_str()
19 && let Some(first_ip) = value.split(',').next()
20 {
21 return first_ip.trim().to_string();
22 }
23 if let Some(real_ip) = headers.get("x-real-ip")
24 && let Ok(value) = real_ip.to_str()
25 {
26 return value.trim().to_string();
27 }
28 "unknown".to_string()
29}
30
31fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
32 let identifier = identifier.trim();
33 if identifier.contains('@') || identifier.starts_with("did:") {
34 identifier.to_string()
35 } else if !identifier.contains('.') {
36 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
37 } else {
38 identifier.to_lowercase()
39 }
40}
41
42fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
43 stored_handle.to_string()
44}
45
46#[derive(Deserialize)]
47pub struct CreateSessionInput {
48 pub identifier: String,
49 pub password: String,
50}
51
52#[derive(Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct CreateSessionOutput {
55 pub access_jwt: String,
56 pub refresh_jwt: String,
57 pub handle: String,
58 pub did: String,
59}
60
61pub async fn create_session(
62 State(state): State<AppState>,
63 headers: HeaderMap,
64 Json(input): Json<CreateSessionInput>,
65) -> Response {
66 info!(
67 "create_session called with identifier: {}",
68 input.identifier
69 );
70 let client_ip = extract_client_ip(&headers);
71 if !state
72 .check_rate_limit(RateLimitKind::Login, &client_ip)
73 .await
74 {
75 warn!(ip = %client_ip, "Login rate limit exceeded");
76 return (
77 StatusCode::TOO_MANY_REQUESTS,
78 Json(json!({
79 "error": "RateLimitExceeded",
80 "message": "Too many login attempts. Please try again later."
81 })),
82 )
83 .into_response();
84 }
85 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
86 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
87 info!(
88 "Normalized identifier: {} -> {}",
89 input.identifier, normalized_identifier
90 );
91 let row = match sqlx::query!(
92 r#"SELECT
93 u.id, u.did, u.handle, u.password_hash,
94 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
95 u.allow_legacy_login,
96 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
97 k.key_bytes, k.encryption_version,
98 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
99 FROM users u
100 JOIN user_keys k ON u.id = k.user_id
101 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
102 normalized_identifier
103 )
104 .fetch_optional(&state.db)
105 .await
106 {
107 Ok(Some(row)) => row,
108 Ok(None) => {
109 let _ = verify(
110 &input.password,
111 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
112 );
113 warn!("User not found for login attempt");
114 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
115 .into_response();
116 }
117 Err(e) => {
118 error!("Database error fetching user: {:?}", e);
119 return ApiError::InternalError.into_response();
120 }
121 };
122 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
123 Ok(k) => k,
124 Err(e) => {
125 error!("Failed to decrypt user key: {:?}", e);
126 return ApiError::InternalError.into_response();
127 }
128 };
129 let password_valid = if row
130 .password_hash
131 .as_ref()
132 .map(|h| verify(&input.password, h).unwrap_or(false))
133 .unwrap_or(false)
134 {
135 true
136 } else {
137 let app_passwords = sqlx::query!(
138 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
139 row.id
140 )
141 .fetch_all(&state.db)
142 .await
143 .unwrap_or_default();
144 app_passwords
145 .iter()
146 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false))
147 };
148 if !password_valid {
149 warn!("Password verification failed for login attempt");
150 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
151 .into_response();
152 }
153 let is_verified =
154 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
155 if !is_verified {
156 warn!("Login attempt for unverified account: {}", row.did);
157 return (
158 StatusCode::FORBIDDEN,
159 Json(json!({
160 "error": "AccountNotVerified",
161 "message": "Please verify your account before logging in",
162 "did": row.did
163 })),
164 )
165 .into_response();
166 }
167 let has_totp = row.totp_enabled.unwrap_or(false);
168 let is_legacy_login = has_totp;
169 if has_totp && !row.allow_legacy_login {
170 warn!(
171 "Legacy login blocked for TOTP-enabled account: {}",
172 row.did
173 );
174 return (
175 StatusCode::FORBIDDEN,
176 Json(json!({
177 "error": "MfaRequired",
178 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
179 "did": row.did
180 })),
181 )
182 .into_response();
183 }
184 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
185 Ok(m) => m,
186 Err(e) => {
187 error!("Failed to create access token: {:?}", e);
188 return ApiError::InternalError.into_response();
189 }
190 };
191 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
192 Ok(m) => m,
193 Err(e) => {
194 error!("Failed to create refresh token: {:?}", e);
195 return ApiError::InternalError.into_response();
196 }
197 };
198 if let Err(e) = sqlx::query!(
199 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)",
200 row.did,
201 access_meta.jti,
202 refresh_meta.jti,
203 access_meta.expires_at,
204 refresh_meta.expires_at,
205 is_legacy_login,
206 false
207 )
208 .execute(&state.db)
209 .await
210 {
211 error!("Failed to insert session: {:?}", e);
212 return ApiError::InternalError.into_response();
213 }
214 if is_legacy_login {
215 warn!(
216 did = %row.did,
217 ip = %client_ip,
218 "Legacy login on TOTP-enabled account - sending notification"
219 );
220 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
221 if let Err(e) = crate::comms::queue_legacy_login_notification(
222 &state.db,
223 row.id,
224 &hostname,
225 &client_ip,
226 row.preferred_comms_channel,
227 )
228 .await
229 {
230 error!("Failed to queue legacy login notification: {:?}", e);
231 }
232 }
233 let handle = full_handle(&row.handle, &pds_hostname);
234 Json(CreateSessionOutput {
235 access_jwt: access_meta.token,
236 refresh_jwt: refresh_meta.token,
237 handle,
238 did: row.did,
239 })
240 .into_response()
241}
242
243pub async fn get_session(
244 State(state): State<AppState>,
245 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
246) -> Response {
247 let permissions = auth_user.permissions();
248 let can_read_email = permissions.allows_email_read();
249
250 match sqlx::query!(
251 r#"SELECT
252 handle, email, email_verified, is_admin, deactivated_at, preferred_locale,
253 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
254 discord_verified, telegram_verified, signal_verified
255 FROM users WHERE did = $1"#,
256 auth_user.did
257 )
258 .fetch_optional(&state.db)
259 .await
260 {
261 Ok(Some(row)) => {
262 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
263 crate::comms::CommsChannel::Email => ("email", row.email_verified),
264 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
265 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
266 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
267 };
268 let pds_hostname =
269 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
270 let handle = full_handle(&row.handle, &pds_hostname);
271 let is_active = row.deactivated_at.is_none();
272 let email_value = if can_read_email {
273 row.email.clone()
274 } else {
275 None
276 };
277 let email_verified_value = can_read_email && row.email_verified;
278 Json(json!({
279 "handle": handle,
280 "did": auth_user.did,
281 "email": email_value,
282 "emailVerified": email_verified_value,
283 "preferredChannel": preferred_channel,
284 "preferredChannelVerified": preferred_channel_verified,
285 "preferredLocale": row.preferred_locale,
286 "isAdmin": row.is_admin,
287 "active": is_active,
288 "status": if is_active { "active" } else { "deactivated" },
289 "didDoc": {}
290 }))
291 .into_response()
292 }
293 Ok(None) => ApiError::AuthenticationFailed.into_response(),
294 Err(e) => {
295 error!("Database error in get_session: {:?}", e);
296 ApiError::InternalError.into_response()
297 }
298 }
299}
300
301pub async fn delete_session(
302 State(state): State<AppState>,
303 headers: axum::http::HeaderMap,
304) -> Response {
305 let token = match crate::auth::extract_bearer_token_from_header(
306 headers.get("Authorization").and_then(|h| h.to_str().ok()),
307 ) {
308 Some(t) => t,
309 None => return ApiError::AuthenticationRequired.into_response(),
310 };
311 let jti = match crate::auth::get_jti_from_token(&token) {
312 Ok(jti) => jti,
313 Err(_) => return ApiError::AuthenticationFailed.into_response(),
314 };
315 let did = crate::auth::get_did_from_token(&token).ok();
316 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
317 .execute(&state.db)
318 .await
319 {
320 Ok(res) if res.rows_affected() > 0 => {
321 if let Some(did) = did {
322 let session_cache_key = format!("auth:session:{}:{}", did, jti);
323 let _ = state.cache.delete(&session_cache_key).await;
324 }
325 Json(json!({})).into_response()
326 }
327 Ok(_) => ApiError::AuthenticationFailed.into_response(),
328 Err(e) => {
329 error!("Database error in delete_session: {:?}", e);
330 ApiError::AuthenticationFailed.into_response()
331 }
332 }
333}
334
335pub async fn refresh_session(
336 State(state): State<AppState>,
337 headers: axum::http::HeaderMap,
338) -> Response {
339 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
340 if !state
341 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
342 .await
343 {
344 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
345 return (
346 axum::http::StatusCode::TOO_MANY_REQUESTS,
347 axum::Json(serde_json::json!({
348 "error": "RateLimitExceeded",
349 "message": "Too many requests. Please try again later."
350 })),
351 )
352 .into_response();
353 }
354 let refresh_token = match crate::auth::extract_bearer_token_from_header(
355 headers.get("Authorization").and_then(|h| h.to_str().ok()),
356 ) {
357 Some(t) => t,
358 None => return ApiError::AuthenticationRequired.into_response(),
359 };
360 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
361 Ok(jti) => jti,
362 Err(_) => {
363 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
364 .into_response();
365 }
366 };
367 let mut tx = match state.db.begin().await {
368 Ok(tx) => tx,
369 Err(e) => {
370 error!("Failed to begin transaction: {:?}", e);
371 return ApiError::InternalError.into_response();
372 }
373 };
374 if let Ok(Some(session_id)) = sqlx::query_scalar!(
375 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
376 refresh_jti
377 )
378 .fetch_optional(&mut *tx)
379 .await
380 {
381 warn!(
382 "Refresh token reuse detected! Revoking token family for session_id: {}",
383 session_id
384 );
385 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
386 .execute(&mut *tx)
387 .await;
388 let _ = tx.commit().await;
389 return ApiError::ExpiredTokenMsg(
390 "Refresh token has been revoked due to suspected compromise".into(),
391 )
392 .into_response();
393 }
394 let session_row = match sqlx::query!(
395 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
396 FROM session_tokens st
397 JOIN users u ON st.did = u.did
398 JOIN user_keys k ON u.id = k.user_id
399 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
400 FOR UPDATE OF st"#,
401 refresh_jti
402 )
403 .fetch_optional(&mut *tx)
404 .await
405 {
406 Ok(Some(row)) => row,
407 Ok(None) => {
408 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
409 .into_response();
410 }
411 Err(e) => {
412 error!("Database error fetching session: {:?}", e);
413 return ApiError::InternalError.into_response();
414 }
415 };
416 let key_bytes =
417 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
418 Ok(k) => k,
419 Err(e) => {
420 error!("Failed to decrypt user key: {:?}", e);
421 return ApiError::InternalError.into_response();
422 }
423 };
424 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
425 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
426 }
427 let new_access_meta =
428 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
429 Ok(m) => m,
430 Err(e) => {
431 error!("Failed to create access token: {:?}", e);
432 return ApiError::InternalError.into_response();
433 }
434 };
435 let new_refresh_meta =
436 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
437 Ok(m) => m,
438 Err(e) => {
439 error!("Failed to create refresh token: {:?}", e);
440 return ApiError::InternalError.into_response();
441 }
442 };
443 match sqlx::query!(
444 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
445 refresh_jti,
446 session_row.id
447 )
448 .execute(&mut *tx)
449 .await
450 {
451 Ok(result) if result.rows_affected() == 0 => {
452 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
453 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
454 .execute(&mut *tx)
455 .await;
456 let _ = tx.commit().await;
457 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
458 }
459 Err(e) => {
460 error!("Failed to record used refresh token: {:?}", e);
461 return ApiError::InternalError.into_response();
462 }
463 Ok(_) => {}
464 }
465 if let Err(e) = sqlx::query!(
466 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
467 new_access_meta.jti,
468 new_refresh_meta.jti,
469 new_access_meta.expires_at,
470 new_refresh_meta.expires_at,
471 session_row.id
472 )
473 .execute(&mut *tx)
474 .await
475 {
476 error!("Database error updating session: {:?}", e);
477 return ApiError::InternalError.into_response();
478 }
479 if let Err(e) = tx.commit().await {
480 error!("Failed to commit transaction: {:?}", e);
481 return ApiError::InternalError.into_response();
482 }
483 match sqlx::query!(
484 r#"SELECT
485 handle, email, email_verified, is_admin, preferred_locale,
486 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
487 discord_verified, telegram_verified, signal_verified
488 FROM users WHERE did = $1"#,
489 session_row.did
490 )
491 .fetch_optional(&state.db)
492 .await
493 {
494 Ok(Some(u)) => {
495 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
496 crate::comms::CommsChannel::Email => ("email", u.email_verified),
497 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
498 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
499 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
500 };
501 let pds_hostname =
502 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
503 let handle = full_handle(&u.handle, &pds_hostname);
504 Json(json!({
505 "accessJwt": new_access_meta.token,
506 "refreshJwt": new_refresh_meta.token,
507 "handle": handle,
508 "did": session_row.did,
509 "email": u.email,
510 "emailVerified": u.email_verified,
511 "preferredChannel": preferred_channel,
512 "preferredChannelVerified": preferred_channel_verified,
513 "preferredLocale": u.preferred_locale,
514 "isAdmin": u.is_admin,
515 "active": true
516 }))
517 .into_response()
518 }
519 Ok(None) => {
520 error!("User not found for existing session: {}", session_row.did);
521 ApiError::InternalError.into_response()
522 }
523 Err(e) => {
524 error!("Database error fetching user: {:?}", e);
525 ApiError::InternalError.into_response()
526 }
527 }
528}
529
530#[derive(Deserialize)]
531#[serde(rename_all = "camelCase")]
532pub struct ConfirmSignupInput {
533 pub did: String,
534 pub verification_code: String,
535}
536
537#[derive(Serialize)]
538#[serde(rename_all = "camelCase")]
539pub struct ConfirmSignupOutput {
540 pub access_jwt: String,
541 pub refresh_jwt: String,
542 pub handle: String,
543 pub did: String,
544 pub email: Option<String>,
545 pub email_verified: bool,
546 pub preferred_channel: String,
547 pub preferred_channel_verified: bool,
548}
549
550pub async fn confirm_signup(
551 State(state): State<AppState>,
552 Json(input): Json<ConfirmSignupInput>,
553) -> Response {
554 info!("confirm_signup called for DID: {}", input.did);
555 let row = match sqlx::query!(
556 r#"SELECT
557 u.id, u.did, u.handle, u.email,
558 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
559 k.key_bytes, k.encryption_version
560 FROM users u
561 JOIN user_keys k ON u.id = k.user_id
562 WHERE u.did = $1"#,
563 input.did
564 )
565 .fetch_optional(&state.db)
566 .await
567 {
568 Ok(Some(row)) => row,
569 Ok(None) => {
570 warn!("User not found for confirm_signup: {}", input.did);
571 return ApiError::InvalidRequest("Invalid DID or verification code".into())
572 .into_response();
573 }
574 Err(e) => {
575 error!("Database error in confirm_signup: {:?}", e);
576 return ApiError::InternalError.into_response();
577 }
578 };
579
580 let channel_str = match row.channel {
581 crate::comms::CommsChannel::Email => "email",
582 crate::comms::CommsChannel::Discord => "discord",
583 crate::comms::CommsChannel::Telegram => "telegram",
584 crate::comms::CommsChannel::Signal => "signal",
585 };
586 let verification = match sqlx::query!(
587 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel",
588 row.id,
589 channel_str as _
590 )
591 .fetch_optional(&state.db)
592 .await
593 {
594 Ok(Some(v)) => v,
595 Ok(None) => {
596 warn!("No verification code found for user: {}", input.did);
597 return ApiError::InvalidRequest("No pending verification".into()).into_response();
598 }
599 Err(e) => {
600 error!("Database error fetching verification: {:?}", e);
601 return ApiError::InternalError.into_response();
602 }
603 };
604
605 if verification.code != input.verification_code {
606 warn!("Invalid verification code for user: {}", input.did);
607 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
608 }
609 if verification.expires_at < Utc::now() {
610 warn!("Verification code expired for user: {}", input.did);
611 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response();
612 }
613
614 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
615 Ok(k) => k,
616 Err(e) => {
617 error!("Failed to decrypt user key: {:?}", e);
618 return ApiError::InternalError.into_response();
619 }
620 };
621 let verified_column = match row.channel {
622 crate::comms::CommsChannel::Email => "email_verified",
623 crate::comms::CommsChannel::Discord => "discord_verified",
624 crate::comms::CommsChannel::Telegram => "telegram_verified",
625 crate::comms::CommsChannel::Signal => "signal_verified",
626 };
627 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
628 if let Err(e) = sqlx::query(&update_query)
629 .bind(&input.did)
630 .execute(&state.db)
631 .await
632 {
633 error!("Failed to update verification status: {:?}", e);
634 return ApiError::InternalError.into_response();
635 }
636
637 if let Err(e) = sqlx::query!(
638 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel",
639 row.id,
640 channel_str as _
641 )
642 .execute(&state.db)
643 .await
644 {
645 error!("Failed to delete verification record: {:?}", e);
646 }
647
648 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
649 Ok(m) => m,
650 Err(e) => {
651 error!("Failed to create access token: {:?}", e);
652 return ApiError::InternalError.into_response();
653 }
654 };
655 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
656 Ok(m) => m,
657 Err(e) => {
658 error!("Failed to create refresh token: {:?}", e);
659 return ApiError::InternalError.into_response();
660 }
661 };
662 if let Err(e) = sqlx::query!(
663 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)",
664 row.did,
665 access_meta.jti,
666 refresh_meta.jti,
667 access_meta.expires_at,
668 refresh_meta.expires_at,
669 false,
670 false
671 )
672 .execute(&state.db)
673 .await
674 {
675 error!("Failed to insert session: {:?}", e);
676 return ApiError::InternalError.into_response();
677 }
678 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
679 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
680 warn!("Failed to enqueue welcome notification: {:?}", e);
681 }
682 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
683 let preferred_channel = match row.channel {
684 crate::comms::CommsChannel::Email => "email",
685 crate::comms::CommsChannel::Discord => "discord",
686 crate::comms::CommsChannel::Telegram => "telegram",
687 crate::comms::CommsChannel::Signal => "signal",
688 };
689 Json(ConfirmSignupOutput {
690 access_jwt: access_meta.token,
691 refresh_jwt: refresh_meta.token,
692 handle: row.handle,
693 did: row.did,
694 email: row.email,
695 email_verified,
696 preferred_channel: preferred_channel.to_string(),
697 preferred_channel_verified: true,
698 })
699 .into_response()
700}
701
702#[derive(Deserialize)]
703#[serde(rename_all = "camelCase")]
704pub struct ResendVerificationInput {
705 pub did: String,
706}
707
708pub async fn resend_verification(
709 State(state): State<AppState>,
710 Json(input): Json<ResendVerificationInput>,
711) -> Response {
712 info!("resend_verification called for DID: {}", input.did);
713 let row = match sqlx::query!(
714 r#"SELECT
715 id, handle, email,
716 preferred_comms_channel as "channel: crate::comms::CommsChannel",
717 discord_id, telegram_username, signal_number,
718 email_verified, discord_verified, telegram_verified, signal_verified
719 FROM users
720 WHERE did = $1"#,
721 input.did
722 )
723 .fetch_optional(&state.db)
724 .await
725 {
726 Ok(Some(row)) => row,
727 Ok(None) => {
728 return ApiError::InvalidRequest("User not found".into()).into_response();
729 }
730 Err(e) => {
731 error!("Database error in resend_verification: {:?}", e);
732 return ApiError::InternalError.into_response();
733 }
734 };
735 let is_verified =
736 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
737 if is_verified {
738 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
739 }
740 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
741 let code_expires_at = Utc::now() + chrono::Duration::minutes(30);
742
743 let (channel_str, recipient) = match row.channel {
744 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
745 crate::comms::CommsChannel::Discord => {
746 ("discord", row.discord_id.clone().unwrap_or_default())
747 }
748 crate::comms::CommsChannel::Telegram => (
749 "telegram",
750 row.telegram_username.clone().unwrap_or_default(),
751 ),
752 crate::comms::CommsChannel::Signal => {
753 ("signal", row.signal_number.clone().unwrap_or_default())
754 }
755 };
756
757 if let Err(e) = sqlx::query!(
758 r#"
759 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at)
760 VALUES ($1, $2::comms_channel, $3, $4, $5)
761 ON CONFLICT (user_id, channel) DO UPDATE
762 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW()
763 "#,
764 row.id,
765 channel_str as _,
766 verification_code,
767 recipient,
768 code_expires_at
769 )
770 .execute(&state.db)
771 .await
772 {
773 error!("Failed to update verification code: {:?}", e);
774 return ApiError::InternalError.into_response();
775 }
776 if let Err(e) = crate::comms::enqueue_signup_verification(
777 &state.db,
778 row.id,
779 channel_str,
780 &recipient,
781 &verification_code,
782 None,
783 )
784 .await
785 {
786 warn!("Failed to enqueue verification notification: {:?}", e);
787 }
788 Json(json!({"success": true})).into_response()
789}
790
791#[derive(Serialize)]
792#[serde(rename_all = "camelCase")]
793pub struct SessionInfo {
794 pub id: String,
795 pub session_type: String,
796 pub client_name: Option<String>,
797 pub created_at: String,
798 pub expires_at: String,
799 pub is_current: bool,
800}
801
802#[derive(Serialize)]
803#[serde(rename_all = "camelCase")]
804pub struct ListSessionsOutput {
805 pub sessions: Vec<SessionInfo>,
806}
807
808pub async fn list_sessions(
809 State(state): State<AppState>,
810 headers: HeaderMap,
811 auth: BearerAuth,
812) -> Response {
813 let current_jti = headers
814 .get("authorization")
815 .and_then(|v| v.to_str().ok())
816 .and_then(|v| v.strip_prefix("Bearer "))
817 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
818
819 let mut sessions: Vec<SessionInfo> = Vec::new();
820
821 let jwt_result = sqlx::query_as::<
822 _,
823 (
824 i32,
825 String,
826 chrono::DateTime<chrono::Utc>,
827 chrono::DateTime<chrono::Utc>,
828 ),
829 >(
830 r#"
831 SELECT id, access_jti, created_at, refresh_expires_at
832 FROM session_tokens
833 WHERE did = $1 AND refresh_expires_at > NOW()
834 ORDER BY created_at DESC
835 "#,
836 )
837 .bind(&auth.0.did)
838 .fetch_all(&state.db)
839 .await;
840
841 match jwt_result {
842 Ok(rows) => {
843 for (id, access_jti, created_at, expires_at) in rows {
844 sessions.push(SessionInfo {
845 id: format!("jwt:{}", id),
846 session_type: "legacy".to_string(),
847 client_name: None,
848 created_at: created_at.to_rfc3339(),
849 expires_at: expires_at.to_rfc3339(),
850 is_current: current_jti.as_ref() == Some(&access_jti),
851 });
852 }
853 }
854 Err(e) => {
855 error!("DB error fetching JWT sessions: {:?}", e);
856 return (
857 StatusCode::INTERNAL_SERVER_ERROR,
858 Json(json!({"error": "InternalError"})),
859 )
860 .into_response();
861 }
862 }
863
864 let oauth_result = sqlx::query_as::<
865 _,
866 (
867 i32,
868 String,
869 chrono::DateTime<chrono::Utc>,
870 chrono::DateTime<chrono::Utc>,
871 String,
872 ),
873 >(
874 r#"
875 SELECT id, token_id, created_at, expires_at, client_id
876 FROM oauth_token
877 WHERE did = $1 AND expires_at > NOW()
878 ORDER BY created_at DESC
879 "#,
880 )
881 .bind(&auth.0.did)
882 .fetch_all(&state.db)
883 .await;
884
885 match oauth_result {
886 Ok(rows) => {
887 for (id, token_id, created_at, expires_at, client_id) in rows {
888 let client_name = extract_client_name(&client_id);
889 let is_current_oauth = auth.0.is_oauth
890 && current_jti.as_ref() == Some(&token_id);
891 sessions.push(SessionInfo {
892 id: format!("oauth:{}", id),
893 session_type: "oauth".to_string(),
894 client_name: Some(client_name),
895 created_at: created_at.to_rfc3339(),
896 expires_at: expires_at.to_rfc3339(),
897 is_current: is_current_oauth,
898 });
899 }
900 }
901 Err(e) => {
902 error!("DB error fetching OAuth sessions: {:?}", e);
903 return (
904 StatusCode::INTERNAL_SERVER_ERROR,
905 Json(json!({"error": "InternalError"})),
906 )
907 .into_response();
908 }
909 }
910
911 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
912
913 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
914}
915
916fn extract_client_name(client_id: &str) -> String {
917 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
918 "Localhost App".to_string()
919 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
920 parsed.host_str().unwrap_or("Unknown App").to_string()
921 } else {
922 client_id.to_string()
923 }
924}
925
926#[derive(Deserialize)]
927#[serde(rename_all = "camelCase")]
928pub struct RevokeSessionInput {
929 pub session_id: String,
930}
931
932pub async fn revoke_session(
933 State(state): State<AppState>,
934 auth: BearerAuth,
935 Json(input): Json<RevokeSessionInput>,
936) -> Response {
937 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
938 let session_id: i32 = match jwt_id.parse() {
939 Ok(id) => id,
940 Err(_) => {
941 return (
942 StatusCode::BAD_REQUEST,
943 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
944 )
945 .into_response();
946 }
947 };
948 let session = sqlx::query_as::<_, (String,)>(
949 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
950 )
951 .bind(session_id)
952 .bind(&auth.0.did)
953 .fetch_optional(&state.db)
954 .await;
955 let access_jti = match session {
956 Ok(Some((jti,))) => jti,
957 Ok(None) => {
958 return (
959 StatusCode::NOT_FOUND,
960 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
961 )
962 .into_response();
963 }
964 Err(e) => {
965 error!("DB error in revoke_session: {:?}", e);
966 return (
967 StatusCode::INTERNAL_SERVER_ERROR,
968 Json(json!({"error": "InternalError"})),
969 )
970 .into_response();
971 }
972 };
973 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
974 .bind(session_id)
975 .execute(&state.db)
976 .await
977 {
978 error!("DB error deleting session: {:?}", e);
979 return (
980 StatusCode::INTERNAL_SERVER_ERROR,
981 Json(json!({"error": "InternalError"})),
982 )
983 .into_response();
984 }
985 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
986 if let Err(e) = state.cache.delete(&cache_key).await {
987 warn!("Failed to invalidate session cache: {:?}", e);
988 }
989 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
990 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
991 let session_id: i32 = match oauth_id.parse() {
992 Ok(id) => id,
993 Err(_) => {
994 return (
995 StatusCode::BAD_REQUEST,
996 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
997 )
998 .into_response();
999 }
1000 };
1001 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
1002 .bind(session_id)
1003 .bind(&auth.0.did)
1004 .execute(&state.db)
1005 .await;
1006 match result {
1007 Ok(r) if r.rows_affected() == 0 => {
1008 return (
1009 StatusCode::NOT_FOUND,
1010 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1011 )
1012 .into_response();
1013 }
1014 Err(e) => {
1015 error!("DB error deleting OAuth session: {:?}", e);
1016 return (
1017 StatusCode::INTERNAL_SERVER_ERROR,
1018 Json(json!({"error": "InternalError"})),
1019 )
1020 .into_response();
1021 }
1022 _ => {}
1023 }
1024 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1025 } else {
1026 return (
1027 StatusCode::BAD_REQUEST,
1028 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1029 )
1030 .into_response();
1031 }
1032 (StatusCode::OK, Json(json!({}))).into_response()
1033}
1034
1035pub async fn revoke_all_sessions(
1036 State(state): State<AppState>,
1037 headers: HeaderMap,
1038 auth: BearerAuth,
1039) -> Response {
1040 let current_jti = headers
1041 .get("authorization")
1042 .and_then(|v| v.to_str().ok())
1043 .and_then(|v| v.strip_prefix("Bearer "))
1044 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1045
1046 if let Some(ref jti) = current_jti {
1047 if auth.0.is_oauth {
1048 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1049 .bind(&auth.0.did)
1050 .execute(&state.db)
1051 .await
1052 {
1053 error!("DB error revoking JWT sessions: {:?}", e);
1054 return (
1055 StatusCode::INTERNAL_SERVER_ERROR,
1056 Json(json!({"error": "InternalError"})),
1057 )
1058 .into_response();
1059 }
1060 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1061 .bind(&auth.0.did)
1062 .bind(jti)
1063 .execute(&state.db)
1064 .await
1065 {
1066 error!("DB error revoking OAuth sessions: {:?}", e);
1067 return (
1068 StatusCode::INTERNAL_SERVER_ERROR,
1069 Json(json!({"error": "InternalError"})),
1070 )
1071 .into_response();
1072 }
1073 } else {
1074 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1075 .bind(&auth.0.did)
1076 .bind(jti)
1077 .execute(&state.db)
1078 .await
1079 {
1080 error!("DB error revoking JWT sessions: {:?}", e);
1081 return (
1082 StatusCode::INTERNAL_SERVER_ERROR,
1083 Json(json!({"error": "InternalError"})),
1084 )
1085 .into_response();
1086 }
1087 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1088 .bind(&auth.0.did)
1089 .execute(&state.db)
1090 .await
1091 {
1092 error!("DB error revoking OAuth sessions: {:?}", e);
1093 return (
1094 StatusCode::INTERNAL_SERVER_ERROR,
1095 Json(json!({"error": "InternalError"})),
1096 )
1097 .into_response();
1098 }
1099 }
1100 } else {
1101 return (
1102 StatusCode::BAD_REQUEST,
1103 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1104 )
1105 .into_response();
1106 }
1107
1108 info!(did = %auth.0.did, "All other sessions revoked");
1109 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1110}
1111
1112#[derive(Serialize)]
1113#[serde(rename_all = "camelCase")]
1114pub struct LegacyLoginPreferenceOutput {
1115 pub allow_legacy_login: bool,
1116 pub has_mfa: bool,
1117}
1118
1119pub async fn get_legacy_login_preference(
1120 State(state): State<AppState>,
1121 auth: BearerAuth,
1122) -> Response {
1123 let result = sqlx::query!(
1124 r#"SELECT
1125 u.allow_legacy_login,
1126 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1127 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1128 FROM users u WHERE u.did = $1"#,
1129 auth.0.did
1130 )
1131 .fetch_optional(&state.db)
1132 .await;
1133
1134 match result {
1135 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1136 allow_legacy_login: row.allow_legacy_login,
1137 has_mfa: row.has_mfa,
1138 })
1139 .into_response(),
1140 Ok(None) => (
1141 StatusCode::NOT_FOUND,
1142 Json(json!({"error": "AccountNotFound"})),
1143 )
1144 .into_response(),
1145 Err(e) => {
1146 error!("DB error: {:?}", e);
1147 (
1148 StatusCode::INTERNAL_SERVER_ERROR,
1149 Json(json!({"error": "InternalError"})),
1150 )
1151 .into_response()
1152 }
1153 }
1154}
1155
1156#[derive(Deserialize)]
1157#[serde(rename_all = "camelCase")]
1158pub struct UpdateLegacyLoginInput {
1159 pub allow_legacy_login: bool,
1160}
1161
1162pub async fn update_legacy_login_preference(
1163 State(state): State<AppState>,
1164 auth: BearerAuth,
1165 Json(input): Json<UpdateLegacyLoginInput>,
1166) -> Response {
1167 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1168 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1169 .await;
1170 }
1171
1172 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1173 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1174 }
1175
1176 let result = sqlx::query!(
1177 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1178 input.allow_legacy_login,
1179 auth.0.did
1180 )
1181 .fetch_optional(&state.db)
1182 .await;
1183
1184 match result {
1185 Ok(Some(_)) => {
1186 info!(
1187 did = %auth.0.did,
1188 allow_legacy_login = input.allow_legacy_login,
1189 "Legacy login preference updated"
1190 );
1191 Json(json!({
1192 "allowLegacyLogin": input.allow_legacy_login
1193 }))
1194 .into_response()
1195 }
1196 Ok(None) => (
1197 StatusCode::NOT_FOUND,
1198 Json(json!({"error": "AccountNotFound"})),
1199 )
1200 .into_response(),
1201 Err(e) => {
1202 error!("DB error: {:?}", e);
1203 (
1204 StatusCode::INTERNAL_SERVER_ERROR,
1205 Json(json!({"error": "InternalError"})),
1206 )
1207 .into_response()
1208 }
1209 }
1210}
1211
1212const VALID_LOCALES: &[&str] = &["en", "zh", "ja", "ko"];
1213
1214#[derive(Deserialize)]
1215#[serde(rename_all = "camelCase")]
1216pub struct UpdateLocaleInput {
1217 pub preferred_locale: String,
1218}
1219
1220pub async fn update_locale(
1221 State(state): State<AppState>,
1222 auth: BearerAuth,
1223 Json(input): Json<UpdateLocaleInput>,
1224) -> Response {
1225 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1226 return (
1227 StatusCode::BAD_REQUEST,
1228 Json(json!({
1229 "error": "InvalidRequest",
1230 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", "))
1231 })),
1232 )
1233 .into_response();
1234 }
1235
1236 let result = sqlx::query!(
1237 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1238 input.preferred_locale,
1239 auth.0.did
1240 )
1241 .fetch_optional(&state.db)
1242 .await;
1243
1244 match result {
1245 Ok(Some(_)) => {
1246 info!(
1247 did = %auth.0.did,
1248 locale = %input.preferred_locale,
1249 "User locale preference updated"
1250 );
1251 Json(json!({
1252 "preferredLocale": input.preferred_locale
1253 }))
1254 .into_response()
1255 }
1256 Ok(None) => (
1257 StatusCode::NOT_FOUND,
1258 Json(json!({"error": "AccountNotFound"})),
1259 )
1260 .into_response(),
1261 Err(e) => {
1262 error!("DB error updating locale: {:?}", e);
1263 (
1264 StatusCode::INTERNAL_SERVER_ERROR,
1265 Json(json!({"error": "InternalError"})),
1266 )
1267 .into_response()
1268 }
1269 }
1270}