this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14
15fn extract_client_ip(headers: &HeaderMap) -> String {
16 if let Some(forwarded) = headers.get("x-forwarded-for")
17 && let Ok(value) = forwarded.to_str()
18 && let Some(first_ip) = value.split(',').next()
19 {
20 return first_ip.trim().to_string();
21 }
22 if let Some(real_ip) = headers.get("x-real-ip")
23 && let Ok(value) = real_ip.to_str()
24 {
25 return value.trim().to_string();
26 }
27 "unknown".to_string()
28}
29
30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
31 let identifier = identifier.trim();
32 if identifier.contains('@') || identifier.starts_with("did:") {
33 identifier.to_string()
34 } else if !identifier.contains('.') {
35 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
36 } else {
37 identifier.to_lowercase()
38 }
39}
40
41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
42 stored_handle.to_string()
43}
44
45#[derive(Deserialize)]
46pub struct CreateSessionInput {
47 pub identifier: String,
48 pub password: String,
49}
50
51#[derive(Serialize)]
52#[serde(rename_all = "camelCase")]
53pub struct CreateSessionOutput {
54 pub access_jwt: String,
55 pub refresh_jwt: String,
56 pub handle: String,
57 pub did: String,
58}
59
60pub async fn create_session(
61 State(state): State<AppState>,
62 headers: HeaderMap,
63 Json(input): Json<CreateSessionInput>,
64) -> Response {
65 info!(
66 "create_session called with identifier: {}",
67 input.identifier
68 );
69 let client_ip = extract_client_ip(&headers);
70 if !state
71 .check_rate_limit(RateLimitKind::Login, &client_ip)
72 .await
73 {
74 warn!(ip = %client_ip, "Login rate limit exceeded");
75 return (
76 StatusCode::TOO_MANY_REQUESTS,
77 Json(json!({
78 "error": "RateLimitExceeded",
79 "message": "Too many login attempts. Please try again later."
80 })),
81 )
82 .into_response();
83 }
84 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
85 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
86 info!(
87 "Normalized identifier: {} -> {}",
88 input.identifier, normalized_identifier
89 );
90 let row = match sqlx::query!(
91 r#"SELECT
92 u.id, u.did, u.handle, u.password_hash,
93 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
94 u.allow_legacy_login,
95 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
96 k.key_bytes, k.encryption_version,
97 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
98 FROM users u
99 JOIN user_keys k ON u.id = k.user_id
100 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
101 normalized_identifier
102 )
103 .fetch_optional(&state.db)
104 .await
105 {
106 Ok(Some(row)) => row,
107 Ok(None) => {
108 let _ = verify(
109 &input.password,
110 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
111 );
112 warn!("User not found for login attempt");
113 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
114 .into_response();
115 }
116 Err(e) => {
117 error!("Database error fetching user: {:?}", e);
118 return ApiError::InternalError.into_response();
119 }
120 };
121 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
122 Ok(k) => k,
123 Err(e) => {
124 error!("Failed to decrypt user key: {:?}", e);
125 return ApiError::InternalError.into_response();
126 }
127 };
128 let (password_valid, app_password_scopes, app_password_controller) = if row
129 .password_hash
130 .as_ref()
131 .map(|h| verify(&input.password, h).unwrap_or(false))
132 .unwrap_or(false)
133 {
134 (true, None, None)
135 } else {
136 let app_passwords = sqlx::query!(
137 "SELECT password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
138 row.id
139 )
140 .fetch_all(&state.db)
141 .await
142 .unwrap_or_default();
143 let matched = app_passwords
144 .iter()
145 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
146 match matched {
147 Some(app) => (
148 true,
149 app.scopes.clone(),
150 app.created_by_controller_did.clone(),
151 ),
152 None => (false, None, None),
153 }
154 };
155 if !password_valid {
156 warn!("Password verification failed for login attempt");
157 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
158 .into_response();
159 }
160 let is_verified =
161 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
162 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did)
163 .await
164 .unwrap_or(false);
165 if !is_verified && !is_delegated {
166 warn!("Login attempt for unverified account: {}", row.did);
167 return (
168 StatusCode::FORBIDDEN,
169 Json(json!({
170 "error": "AccountNotVerified",
171 "message": "Please verify your account before logging in",
172 "did": row.did
173 })),
174 )
175 .into_response();
176 }
177 let has_totp = row.totp_enabled.unwrap_or(false);
178 let is_legacy_login = has_totp;
179 if has_totp && !row.allow_legacy_login {
180 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
181 return (
182 StatusCode::FORBIDDEN,
183 Json(json!({
184 "error": "MfaRequired",
185 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
186 "did": row.did
187 })),
188 )
189 .into_response();
190 }
191 let access_meta = match crate::auth::create_access_token_with_delegation(
192 &row.did,
193 &key_bytes,
194 app_password_scopes.as_deref(),
195 app_password_controller.as_deref(),
196 ) {
197 Ok(m) => m,
198 Err(e) => {
199 error!("Failed to create access token: {:?}", e);
200 return ApiError::InternalError.into_response();
201 }
202 };
203 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
204 Ok(m) => m,
205 Err(e) => {
206 error!("Failed to create refresh token: {:?}", e);
207 return ApiError::InternalError.into_response();
208 }
209 };
210 if let Err(e) = sqlx::query!(
211 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
212 row.did,
213 access_meta.jti,
214 refresh_meta.jti,
215 access_meta.expires_at,
216 refresh_meta.expires_at,
217 is_legacy_login,
218 false,
219 app_password_scopes,
220 app_password_controller
221 )
222 .execute(&state.db)
223 .await
224 {
225 error!("Failed to insert session: {:?}", e);
226 return ApiError::InternalError.into_response();
227 }
228 if is_legacy_login {
229 warn!(
230 did = %row.did,
231 ip = %client_ip,
232 "Legacy login on TOTP-enabled account - sending notification"
233 );
234 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
235 if let Err(e) = crate::comms::queue_legacy_login_notification(
236 &state.db,
237 row.id,
238 &hostname,
239 &client_ip,
240 row.preferred_comms_channel,
241 )
242 .await
243 {
244 error!("Failed to queue legacy login notification: {:?}", e);
245 }
246 }
247 let handle = full_handle(&row.handle, &pds_hostname);
248 Json(CreateSessionOutput {
249 access_jwt: access_meta.token,
250 refresh_jwt: refresh_meta.token,
251 handle,
252 did: row.did,
253 })
254 .into_response()
255}
256
257pub async fn get_session(
258 State(state): State<AppState>,
259 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
260) -> Response {
261 let permissions = auth_user.permissions();
262 let can_read_email = permissions.allows_email_read();
263
264 match sqlx::query!(
265 r#"SELECT
266 handle, email, email_verified, is_admin, deactivated_at, preferred_locale,
267 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
268 discord_verified, telegram_verified, signal_verified
269 FROM users WHERE did = $1"#,
270 auth_user.did
271 )
272 .fetch_optional(&state.db)
273 .await
274 {
275 Ok(Some(row)) => {
276 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
277 crate::comms::CommsChannel::Email => ("email", row.email_verified),
278 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
279 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
280 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
281 };
282 let pds_hostname =
283 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
284 let handle = full_handle(&row.handle, &pds_hostname);
285 let is_active = row.deactivated_at.is_none();
286 let email_value = if can_read_email {
287 row.email.clone()
288 } else {
289 None
290 };
291 let email_verified_value = can_read_email && row.email_verified;
292 Json(json!({
293 "handle": handle,
294 "did": auth_user.did,
295 "email": email_value,
296 "emailVerified": email_verified_value,
297 "preferredChannel": preferred_channel,
298 "preferredChannelVerified": preferred_channel_verified,
299 "preferredLocale": row.preferred_locale,
300 "isAdmin": row.is_admin,
301 "active": is_active,
302 "status": if is_active { "active" } else { "deactivated" },
303 "didDoc": {}
304 }))
305 .into_response()
306 }
307 Ok(None) => ApiError::AuthenticationFailed.into_response(),
308 Err(e) => {
309 error!("Database error in get_session: {:?}", e);
310 ApiError::InternalError.into_response()
311 }
312 }
313}
314
315pub async fn delete_session(
316 State(state): State<AppState>,
317 headers: axum::http::HeaderMap,
318) -> Response {
319 let token = match crate::auth::extract_bearer_token_from_header(
320 headers.get("Authorization").and_then(|h| h.to_str().ok()),
321 ) {
322 Some(t) => t,
323 None => return ApiError::AuthenticationRequired.into_response(),
324 };
325 let jti = match crate::auth::get_jti_from_token(&token) {
326 Ok(jti) => jti,
327 Err(_) => return ApiError::AuthenticationFailed.into_response(),
328 };
329 let did = crate::auth::get_did_from_token(&token).ok();
330 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
331 .execute(&state.db)
332 .await
333 {
334 Ok(res) if res.rows_affected() > 0 => {
335 if let Some(did) = did {
336 let session_cache_key = format!("auth:session:{}:{}", did, jti);
337 let _ = state.cache.delete(&session_cache_key).await;
338 }
339 Json(json!({})).into_response()
340 }
341 Ok(_) => ApiError::AuthenticationFailed.into_response(),
342 Err(e) => {
343 error!("Database error in delete_session: {:?}", e);
344 ApiError::AuthenticationFailed.into_response()
345 }
346 }
347}
348
349pub async fn refresh_session(
350 State(state): State<AppState>,
351 headers: axum::http::HeaderMap,
352) -> Response {
353 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
354 if !state
355 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
356 .await
357 {
358 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
359 return (
360 axum::http::StatusCode::TOO_MANY_REQUESTS,
361 axum::Json(serde_json::json!({
362 "error": "RateLimitExceeded",
363 "message": "Too many requests. Please try again later."
364 })),
365 )
366 .into_response();
367 }
368 let refresh_token = match crate::auth::extract_bearer_token_from_header(
369 headers.get("Authorization").and_then(|h| h.to_str().ok()),
370 ) {
371 Some(t) => t,
372 None => return ApiError::AuthenticationRequired.into_response(),
373 };
374 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
375 Ok(jti) => jti,
376 Err(_) => {
377 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
378 .into_response();
379 }
380 };
381 let mut tx = match state.db.begin().await {
382 Ok(tx) => tx,
383 Err(e) => {
384 error!("Failed to begin transaction: {:?}", e);
385 return ApiError::InternalError.into_response();
386 }
387 };
388 if let Ok(Some(session_id)) = sqlx::query_scalar!(
389 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
390 refresh_jti
391 )
392 .fetch_optional(&mut *tx)
393 .await
394 {
395 warn!(
396 "Refresh token reuse detected! Revoking token family for session_id: {}",
397 session_id
398 );
399 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
400 .execute(&mut *tx)
401 .await;
402 let _ = tx.commit().await;
403 return ApiError::ExpiredTokenMsg(
404 "Refresh token has been revoked due to suspected compromise".into(),
405 )
406 .into_response();
407 }
408 let session_row = match sqlx::query!(
409 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version
410 FROM session_tokens st
411 JOIN users u ON st.did = u.did
412 JOIN user_keys k ON u.id = k.user_id
413 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
414 FOR UPDATE OF st"#,
415 refresh_jti
416 )
417 .fetch_optional(&mut *tx)
418 .await
419 {
420 Ok(Some(row)) => row,
421 Ok(None) => {
422 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
423 .into_response();
424 }
425 Err(e) => {
426 error!("Database error fetching session: {:?}", e);
427 return ApiError::InternalError.into_response();
428 }
429 };
430 let key_bytes =
431 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
432 Ok(k) => k,
433 Err(e) => {
434 error!("Failed to decrypt user key: {:?}", e);
435 return ApiError::InternalError.into_response();
436 }
437 };
438 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
439 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
440 }
441 let new_access_meta = match crate::auth::create_access_token_with_delegation(
442 &session_row.did,
443 &key_bytes,
444 session_row.scope.as_deref(),
445 session_row.controller_did.as_deref(),
446 ) {
447 Ok(m) => m,
448 Err(e) => {
449 error!("Failed to create access token: {:?}", e);
450 return ApiError::InternalError.into_response();
451 }
452 };
453 let new_refresh_meta =
454 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
455 Ok(m) => m,
456 Err(e) => {
457 error!("Failed to create refresh token: {:?}", e);
458 return ApiError::InternalError.into_response();
459 }
460 };
461 match sqlx::query!(
462 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
463 refresh_jti,
464 session_row.id
465 )
466 .execute(&mut *tx)
467 .await
468 {
469 Ok(result) if result.rows_affected() == 0 => {
470 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
471 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
472 .execute(&mut *tx)
473 .await;
474 let _ = tx.commit().await;
475 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
476 }
477 Err(e) => {
478 error!("Failed to record used refresh token: {:?}", e);
479 return ApiError::InternalError.into_response();
480 }
481 Ok(_) => {}
482 }
483 if let Err(e) = sqlx::query!(
484 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
485 new_access_meta.jti,
486 new_refresh_meta.jti,
487 new_access_meta.expires_at,
488 new_refresh_meta.expires_at,
489 session_row.id
490 )
491 .execute(&mut *tx)
492 .await
493 {
494 error!("Database error updating session: {:?}", e);
495 return ApiError::InternalError.into_response();
496 }
497 if let Err(e) = tx.commit().await {
498 error!("Failed to commit transaction: {:?}", e);
499 return ApiError::InternalError.into_response();
500 }
501 match sqlx::query!(
502 r#"SELECT
503 handle, email, email_verified, is_admin, preferred_locale,
504 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
505 discord_verified, telegram_verified, signal_verified
506 FROM users WHERE did = $1"#,
507 session_row.did
508 )
509 .fetch_optional(&state.db)
510 .await
511 {
512 Ok(Some(u)) => {
513 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
514 crate::comms::CommsChannel::Email => ("email", u.email_verified),
515 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
516 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
517 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
518 };
519 let pds_hostname =
520 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
521 let handle = full_handle(&u.handle, &pds_hostname);
522 Json(json!({
523 "accessJwt": new_access_meta.token,
524 "refreshJwt": new_refresh_meta.token,
525 "handle": handle,
526 "did": session_row.did,
527 "email": u.email,
528 "emailVerified": u.email_verified,
529 "preferredChannel": preferred_channel,
530 "preferredChannelVerified": preferred_channel_verified,
531 "preferredLocale": u.preferred_locale,
532 "isAdmin": u.is_admin,
533 "active": true
534 }))
535 .into_response()
536 }
537 Ok(None) => {
538 error!("User not found for existing session: {}", session_row.did);
539 ApiError::InternalError.into_response()
540 }
541 Err(e) => {
542 error!("Database error fetching user: {:?}", e);
543 ApiError::InternalError.into_response()
544 }
545 }
546}
547
548#[derive(Deserialize)]
549#[serde(rename_all = "camelCase")]
550pub struct ConfirmSignupInput {
551 pub did: String,
552 pub verification_code: String,
553}
554
555#[derive(Serialize)]
556#[serde(rename_all = "camelCase")]
557pub struct ConfirmSignupOutput {
558 pub access_jwt: String,
559 pub refresh_jwt: String,
560 pub handle: String,
561 pub did: String,
562 pub email: Option<String>,
563 pub email_verified: bool,
564 pub preferred_channel: String,
565 pub preferred_channel_verified: bool,
566}
567
568pub async fn confirm_signup(
569 State(state): State<AppState>,
570 Json(input): Json<ConfirmSignupInput>,
571) -> Response {
572 info!("confirm_signup called for DID: {}", input.did);
573 let row = match sqlx::query!(
574 r#"SELECT
575 u.id, u.did, u.handle, u.email,
576 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
577 u.discord_id, u.telegram_username, u.signal_number,
578 k.key_bytes, k.encryption_version
579 FROM users u
580 JOIN user_keys k ON u.id = k.user_id
581 WHERE u.did = $1"#,
582 input.did
583 )
584 .fetch_optional(&state.db)
585 .await
586 {
587 Ok(Some(row)) => row,
588 Ok(None) => {
589 warn!("User not found for confirm_signup: {}", input.did);
590 return ApiError::InvalidRequest("Invalid DID or verification code".into())
591 .into_response();
592 }
593 Err(e) => {
594 error!("Database error in confirm_signup: {:?}", e);
595 return ApiError::InternalError.into_response();
596 }
597 };
598
599 let (channel_str, identifier) = match row.channel {
600 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
601 crate::comms::CommsChannel::Discord => {
602 ("discord", row.discord_id.clone().unwrap_or_default())
603 }
604 crate::comms::CommsChannel::Telegram => (
605 "telegram",
606 row.telegram_username.clone().unwrap_or_default(),
607 ),
608 crate::comms::CommsChannel::Signal => {
609 ("signal", row.signal_number.clone().unwrap_or_default())
610 }
611 };
612
613 let normalized_token =
614 crate::auth::verification_token::normalize_token_input(&input.verification_code);
615 match crate::auth::verification_token::verify_signup_token(
616 &normalized_token,
617 channel_str,
618 &identifier,
619 ) {
620 Ok(token_data) => {
621 if token_data.did != input.did {
622 warn!(
623 "Token DID mismatch for confirm_signup: expected {}, got {}",
624 input.did, token_data.did
625 );
626 return ApiError::InvalidRequest("Invalid verification code".into())
627 .into_response();
628 }
629 }
630 Err(crate::auth::verification_token::VerifyError::Expired) => {
631 warn!("Verification code expired for user: {}", input.did);
632 return ApiError::ExpiredTokenMsg("Verification code has expired".into())
633 .into_response();
634 }
635 Err(e) => {
636 warn!("Invalid verification code for user {}: {:?}", input.did, e);
637 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
638 }
639 }
640
641 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
642 Ok(k) => k,
643 Err(e) => {
644 error!("Failed to decrypt user key: {:?}", e);
645 return ApiError::InternalError.into_response();
646 }
647 };
648 let verified_column = match row.channel {
649 crate::comms::CommsChannel::Email => "email_verified",
650 crate::comms::CommsChannel::Discord => "discord_verified",
651 crate::comms::CommsChannel::Telegram => "telegram_verified",
652 crate::comms::CommsChannel::Signal => "signal_verified",
653 };
654 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
655 if let Err(e) = sqlx::query(&update_query)
656 .bind(&input.did)
657 .execute(&state.db)
658 .await
659 {
660 error!("Failed to update verification status: {:?}", e);
661 return ApiError::InternalError.into_response();
662 }
663
664 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
665 Ok(m) => m,
666 Err(e) => {
667 error!("Failed to create access token: {:?}", e);
668 return ApiError::InternalError.into_response();
669 }
670 };
671 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
672 Ok(m) => m,
673 Err(e) => {
674 error!("Failed to create refresh token: {:?}", e);
675 return ApiError::InternalError.into_response();
676 }
677 };
678 let no_scope: Option<String> = None;
679 if let Err(e) = sqlx::query!(
680 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
681 row.did,
682 access_meta.jti,
683 refresh_meta.jti,
684 access_meta.expires_at,
685 refresh_meta.expires_at,
686 false,
687 false,
688 no_scope
689 )
690 .execute(&state.db)
691 .await
692 {
693 error!("Failed to insert session: {:?}", e);
694 return ApiError::InternalError.into_response();
695 }
696 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
697 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
698 warn!("Failed to enqueue welcome notification: {:?}", e);
699 }
700 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
701 let preferred_channel = match row.channel {
702 crate::comms::CommsChannel::Email => "email",
703 crate::comms::CommsChannel::Discord => "discord",
704 crate::comms::CommsChannel::Telegram => "telegram",
705 crate::comms::CommsChannel::Signal => "signal",
706 };
707 Json(ConfirmSignupOutput {
708 access_jwt: access_meta.token,
709 refresh_jwt: refresh_meta.token,
710 handle: row.handle,
711 did: row.did,
712 email: row.email,
713 email_verified,
714 preferred_channel: preferred_channel.to_string(),
715 preferred_channel_verified: true,
716 })
717 .into_response()
718}
719
720#[derive(Deserialize)]
721#[serde(rename_all = "camelCase")]
722pub struct ResendVerificationInput {
723 pub did: String,
724}
725
726pub async fn resend_verification(
727 State(state): State<AppState>,
728 Json(input): Json<ResendVerificationInput>,
729) -> Response {
730 info!("resend_verification called for DID: {}", input.did);
731 let row = match sqlx::query!(
732 r#"SELECT
733 id, handle, email,
734 preferred_comms_channel as "channel: crate::comms::CommsChannel",
735 discord_id, telegram_username, signal_number,
736 email_verified, discord_verified, telegram_verified, signal_verified
737 FROM users
738 WHERE did = $1"#,
739 input.did
740 )
741 .fetch_optional(&state.db)
742 .await
743 {
744 Ok(Some(row)) => row,
745 Ok(None) => {
746 return ApiError::InvalidRequest("User not found".into()).into_response();
747 }
748 Err(e) => {
749 error!("Database error in resend_verification: {:?}", e);
750 return ApiError::InternalError.into_response();
751 }
752 };
753 let is_verified =
754 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
755 if is_verified {
756 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
757 }
758
759 let (channel_str, recipient) = match row.channel {
760 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
761 crate::comms::CommsChannel::Discord => {
762 ("discord", row.discord_id.clone().unwrap_or_default())
763 }
764 crate::comms::CommsChannel::Telegram => (
765 "telegram",
766 row.telegram_username.clone().unwrap_or_default(),
767 ),
768 crate::comms::CommsChannel::Signal => {
769 ("signal", row.signal_number.clone().unwrap_or_default())
770 }
771 };
772
773 let verification_token =
774 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
775 let formatted_token =
776 crate::auth::verification_token::format_token_for_display(&verification_token);
777
778 if let Err(e) = crate::comms::enqueue_signup_verification(
779 &state.db,
780 row.id,
781 channel_str,
782 &recipient,
783 &formatted_token,
784 None,
785 )
786 .await
787 {
788 warn!("Failed to enqueue verification notification: {:?}", e);
789 }
790 Json(json!({"success": true})).into_response()
791}
792
793#[derive(Serialize)]
794#[serde(rename_all = "camelCase")]
795pub struct SessionInfo {
796 pub id: String,
797 pub session_type: String,
798 pub client_name: Option<String>,
799 pub created_at: String,
800 pub expires_at: String,
801 pub is_current: bool,
802}
803
804#[derive(Serialize)]
805#[serde(rename_all = "camelCase")]
806pub struct ListSessionsOutput {
807 pub sessions: Vec<SessionInfo>,
808}
809
810pub async fn list_sessions(
811 State(state): State<AppState>,
812 headers: HeaderMap,
813 auth: BearerAuth,
814) -> Response {
815 let current_jti = headers
816 .get("authorization")
817 .and_then(|v| v.to_str().ok())
818 .and_then(|v| v.strip_prefix("Bearer "))
819 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
820
821 let mut sessions: Vec<SessionInfo> = Vec::new();
822
823 let jwt_result = sqlx::query_as::<
824 _,
825 (
826 i32,
827 String,
828 chrono::DateTime<chrono::Utc>,
829 chrono::DateTime<chrono::Utc>,
830 ),
831 >(
832 r#"
833 SELECT id, access_jti, created_at, refresh_expires_at
834 FROM session_tokens
835 WHERE did = $1 AND refresh_expires_at > NOW()
836 ORDER BY created_at DESC
837 "#,
838 )
839 .bind(&auth.0.did)
840 .fetch_all(&state.db)
841 .await;
842
843 match jwt_result {
844 Ok(rows) => {
845 for (id, access_jti, created_at, expires_at) in rows {
846 sessions.push(SessionInfo {
847 id: format!("jwt:{}", id),
848 session_type: "legacy".to_string(),
849 client_name: None,
850 created_at: created_at.to_rfc3339(),
851 expires_at: expires_at.to_rfc3339(),
852 is_current: current_jti.as_ref() == Some(&access_jti),
853 });
854 }
855 }
856 Err(e) => {
857 error!("DB error fetching JWT sessions: {:?}", e);
858 return (
859 StatusCode::INTERNAL_SERVER_ERROR,
860 Json(json!({"error": "InternalError"})),
861 )
862 .into_response();
863 }
864 }
865
866 let oauth_result = sqlx::query_as::<
867 _,
868 (
869 i32,
870 String,
871 chrono::DateTime<chrono::Utc>,
872 chrono::DateTime<chrono::Utc>,
873 String,
874 ),
875 >(
876 r#"
877 SELECT id, token_id, created_at, expires_at, client_id
878 FROM oauth_token
879 WHERE did = $1 AND expires_at > NOW()
880 ORDER BY created_at DESC
881 "#,
882 )
883 .bind(&auth.0.did)
884 .fetch_all(&state.db)
885 .await;
886
887 match oauth_result {
888 Ok(rows) => {
889 for (id, token_id, created_at, expires_at, client_id) in rows {
890 let client_name = extract_client_name(&client_id);
891 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id);
892 sessions.push(SessionInfo {
893 id: format!("oauth:{}", id),
894 session_type: "oauth".to_string(),
895 client_name: Some(client_name),
896 created_at: created_at.to_rfc3339(),
897 expires_at: expires_at.to_rfc3339(),
898 is_current: is_current_oauth,
899 });
900 }
901 }
902 Err(e) => {
903 error!("DB error fetching OAuth sessions: {:?}", e);
904 return (
905 StatusCode::INTERNAL_SERVER_ERROR,
906 Json(json!({"error": "InternalError"})),
907 )
908 .into_response();
909 }
910 }
911
912 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
913
914 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
915}
916
917fn extract_client_name(client_id: &str) -> String {
918 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
919 "Localhost App".to_string()
920 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
921 parsed.host_str().unwrap_or("Unknown App").to_string()
922 } else {
923 client_id.to_string()
924 }
925}
926
927#[derive(Deserialize)]
928#[serde(rename_all = "camelCase")]
929pub struct RevokeSessionInput {
930 pub session_id: String,
931}
932
933pub async fn revoke_session(
934 State(state): State<AppState>,
935 auth: BearerAuth,
936 Json(input): Json<RevokeSessionInput>,
937) -> Response {
938 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
939 let session_id: i32 = match jwt_id.parse() {
940 Ok(id) => id,
941 Err(_) => {
942 return (
943 StatusCode::BAD_REQUEST,
944 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
945 )
946 .into_response();
947 }
948 };
949 let session = sqlx::query_as::<_, (String,)>(
950 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
951 )
952 .bind(session_id)
953 .bind(&auth.0.did)
954 .fetch_optional(&state.db)
955 .await;
956 let access_jti = match session {
957 Ok(Some((jti,))) => jti,
958 Ok(None) => {
959 return (
960 StatusCode::NOT_FOUND,
961 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
962 )
963 .into_response();
964 }
965 Err(e) => {
966 error!("DB error in revoke_session: {:?}", e);
967 return (
968 StatusCode::INTERNAL_SERVER_ERROR,
969 Json(json!({"error": "InternalError"})),
970 )
971 .into_response();
972 }
973 };
974 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
975 .bind(session_id)
976 .execute(&state.db)
977 .await
978 {
979 error!("DB error deleting session: {:?}", e);
980 return (
981 StatusCode::INTERNAL_SERVER_ERROR,
982 Json(json!({"error": "InternalError"})),
983 )
984 .into_response();
985 }
986 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
987 if let Err(e) = state.cache.delete(&cache_key).await {
988 warn!("Failed to invalidate session cache: {:?}", e);
989 }
990 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
991 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
992 let session_id: i32 = match oauth_id.parse() {
993 Ok(id) => id,
994 Err(_) => {
995 return (
996 StatusCode::BAD_REQUEST,
997 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
998 )
999 .into_response();
1000 }
1001 };
1002 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
1003 .bind(session_id)
1004 .bind(&auth.0.did)
1005 .execute(&state.db)
1006 .await;
1007 match result {
1008 Ok(r) if r.rows_affected() == 0 => {
1009 return (
1010 StatusCode::NOT_FOUND,
1011 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1012 )
1013 .into_response();
1014 }
1015 Err(e) => {
1016 error!("DB error deleting OAuth session: {:?}", e);
1017 return (
1018 StatusCode::INTERNAL_SERVER_ERROR,
1019 Json(json!({"error": "InternalError"})),
1020 )
1021 .into_response();
1022 }
1023 _ => {}
1024 }
1025 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1026 } else {
1027 return (
1028 StatusCode::BAD_REQUEST,
1029 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1030 )
1031 .into_response();
1032 }
1033 (StatusCode::OK, Json(json!({}))).into_response()
1034}
1035
1036pub async fn revoke_all_sessions(
1037 State(state): State<AppState>,
1038 headers: HeaderMap,
1039 auth: BearerAuth,
1040) -> Response {
1041 let current_jti = headers
1042 .get("authorization")
1043 .and_then(|v| v.to_str().ok())
1044 .and_then(|v| v.strip_prefix("Bearer "))
1045 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1046
1047 if let Some(ref jti) = current_jti {
1048 if auth.0.is_oauth {
1049 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1050 .bind(&auth.0.did)
1051 .execute(&state.db)
1052 .await
1053 {
1054 error!("DB error revoking JWT sessions: {:?}", e);
1055 return (
1056 StatusCode::INTERNAL_SERVER_ERROR,
1057 Json(json!({"error": "InternalError"})),
1058 )
1059 .into_response();
1060 }
1061 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1062 .bind(&auth.0.did)
1063 .bind(jti)
1064 .execute(&state.db)
1065 .await
1066 {
1067 error!("DB error revoking OAuth sessions: {:?}", e);
1068 return (
1069 StatusCode::INTERNAL_SERVER_ERROR,
1070 Json(json!({"error": "InternalError"})),
1071 )
1072 .into_response();
1073 }
1074 } else {
1075 if let Err(e) =
1076 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1077 .bind(&auth.0.did)
1078 .bind(jti)
1079 .execute(&state.db)
1080 .await
1081 {
1082 error!("DB error revoking JWT sessions: {:?}", e);
1083 return (
1084 StatusCode::INTERNAL_SERVER_ERROR,
1085 Json(json!({"error": "InternalError"})),
1086 )
1087 .into_response();
1088 }
1089 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1090 .bind(&auth.0.did)
1091 .execute(&state.db)
1092 .await
1093 {
1094 error!("DB error revoking OAuth sessions: {:?}", e);
1095 return (
1096 StatusCode::INTERNAL_SERVER_ERROR,
1097 Json(json!({"error": "InternalError"})),
1098 )
1099 .into_response();
1100 }
1101 }
1102 } else {
1103 return (
1104 StatusCode::BAD_REQUEST,
1105 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1106 )
1107 .into_response();
1108 }
1109
1110 info!(did = %auth.0.did, "All other sessions revoked");
1111 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1112}
1113
1114#[derive(Serialize)]
1115#[serde(rename_all = "camelCase")]
1116pub struct LegacyLoginPreferenceOutput {
1117 pub allow_legacy_login: bool,
1118 pub has_mfa: bool,
1119}
1120
1121pub async fn get_legacy_login_preference(
1122 State(state): State<AppState>,
1123 auth: BearerAuth,
1124) -> Response {
1125 let result = sqlx::query!(
1126 r#"SELECT
1127 u.allow_legacy_login,
1128 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1129 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1130 FROM users u WHERE u.did = $1"#,
1131 auth.0.did
1132 )
1133 .fetch_optional(&state.db)
1134 .await;
1135
1136 match result {
1137 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1138 allow_legacy_login: row.allow_legacy_login,
1139 has_mfa: row.has_mfa,
1140 })
1141 .into_response(),
1142 Ok(None) => (
1143 StatusCode::NOT_FOUND,
1144 Json(json!({"error": "AccountNotFound"})),
1145 )
1146 .into_response(),
1147 Err(e) => {
1148 error!("DB error: {:?}", e);
1149 (
1150 StatusCode::INTERNAL_SERVER_ERROR,
1151 Json(json!({"error": "InternalError"})),
1152 )
1153 .into_response()
1154 }
1155 }
1156}
1157
1158#[derive(Deserialize)]
1159#[serde(rename_all = "camelCase")]
1160pub struct UpdateLegacyLoginInput {
1161 pub allow_legacy_login: bool,
1162}
1163
1164pub async fn update_legacy_login_preference(
1165 State(state): State<AppState>,
1166 auth: BearerAuth,
1167 Json(input): Json<UpdateLegacyLoginInput>,
1168) -> Response {
1169 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1170 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1171 .await;
1172 }
1173
1174 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1175 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1176 }
1177
1178 let result = sqlx::query!(
1179 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1180 input.allow_legacy_login,
1181 auth.0.did
1182 )
1183 .fetch_optional(&state.db)
1184 .await;
1185
1186 match result {
1187 Ok(Some(_)) => {
1188 info!(
1189 did = %auth.0.did,
1190 allow_legacy_login = input.allow_legacy_login,
1191 "Legacy login preference updated"
1192 );
1193 Json(json!({
1194 "allowLegacyLogin": input.allow_legacy_login
1195 }))
1196 .into_response()
1197 }
1198 Ok(None) => (
1199 StatusCode::NOT_FOUND,
1200 Json(json!({"error": "AccountNotFound"})),
1201 )
1202 .into_response(),
1203 Err(e) => {
1204 error!("DB error: {:?}", e);
1205 (
1206 StatusCode::INTERNAL_SERVER_ERROR,
1207 Json(json!({"error": "InternalError"})),
1208 )
1209 .into_response()
1210 }
1211 }
1212}
1213
1214use crate::comms::locale::VALID_LOCALES;
1215
1216#[derive(Deserialize)]
1217#[serde(rename_all = "camelCase")]
1218pub struct UpdateLocaleInput {
1219 pub preferred_locale: String,
1220}
1221
1222pub async fn update_locale(
1223 State(state): State<AppState>,
1224 auth: BearerAuth,
1225 Json(input): Json<UpdateLocaleInput>,
1226) -> Response {
1227 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1228 return (
1229 StatusCode::BAD_REQUEST,
1230 Json(json!({
1231 "error": "InvalidRequest",
1232 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", "))
1233 })),
1234 )
1235 .into_response();
1236 }
1237
1238 let result = sqlx::query!(
1239 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1240 input.preferred_locale,
1241 auth.0.did
1242 )
1243 .fetch_optional(&state.db)
1244 .await;
1245
1246 match result {
1247 Ok(Some(_)) => {
1248 info!(
1249 did = %auth.0.did,
1250 locale = %input.preferred_locale,
1251 "User locale preference updated"
1252 );
1253 Json(json!({
1254 "preferredLocale": input.preferred_locale
1255 }))
1256 .into_response()
1257 }
1258 Ok(None) => (
1259 StatusCode::NOT_FOUND,
1260 Json(json!({"error": "AccountNotFound"})),
1261 )
1262 .into_response(),
1263 Err(e) => {
1264 error!("DB error updating locale: {:?}", e);
1265 (
1266 StatusCode::INTERNAL_SERVER_ERROR,
1267 Json(json!({"error": "InternalError"})),
1268 )
1269 .into_response()
1270 }
1271 }
1272}