this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14
15fn extract_client_ip(headers: &HeaderMap) -> String {
16 if let Some(forwarded) = headers.get("x-forwarded-for")
17 && let Ok(value) = forwarded.to_str()
18 && let Some(first_ip) = value.split(',').next()
19 {
20 return first_ip.trim().to_string();
21 }
22 if let Some(real_ip) = headers.get("x-real-ip")
23 && let Ok(value) = real_ip.to_str()
24 {
25 return value.trim().to_string();
26 }
27 "unknown".to_string()
28}
29
30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
31 let identifier = identifier.trim();
32 if identifier.contains('@') || identifier.starts_with("did:") {
33 identifier.to_string()
34 } else if !identifier.contains('.') {
35 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
36 } else {
37 identifier.to_lowercase()
38 }
39}
40
41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
42 stored_handle.to_string()
43}
44
45#[derive(Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct CreateSessionInput {
48 pub identifier: String,
49 pub password: String,
50 #[serde(default)]
51 pub allow_takendown: bool,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateSessionOutput {
57 pub access_jwt: String,
58 pub refresh_jwt: String,
59 pub handle: String,
60 pub did: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub did_doc: Option<serde_json::Value>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub email: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub email_confirmed: Option<bool>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub active: Option<bool>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub status: Option<String>,
71}
72
73pub async fn create_session(
74 State(state): State<AppState>,
75 headers: HeaderMap,
76 Json(input): Json<CreateSessionInput>,
77) -> Response {
78 info!(
79 "create_session called with identifier: {}",
80 input.identifier
81 );
82 let client_ip = extract_client_ip(&headers);
83 if !state
84 .check_rate_limit(RateLimitKind::Login, &client_ip)
85 .await
86 {
87 warn!(ip = %client_ip, "Login rate limit exceeded");
88 return (
89 StatusCode::TOO_MANY_REQUESTS,
90 Json(json!({
91 "error": "RateLimitExceeded",
92 "message": "Too many login attempts. Please try again later."
93 })),
94 )
95 .into_response();
96 }
97 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
98 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
99 info!(
100 "Normalized identifier: {} -> {}",
101 input.identifier, normalized_identifier
102 );
103 let row = match sqlx::query!(
104 r#"SELECT
105 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref,
106 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
107 u.allow_legacy_login,
108 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
109 k.key_bytes, k.encryption_version,
110 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
111 FROM users u
112 JOIN user_keys k ON u.id = k.user_id
113 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
114 normalized_identifier
115 )
116 .fetch_optional(&state.db)
117 .await
118 {
119 Ok(Some(row)) => row,
120 Ok(None) => {
121 let _ = verify(
122 &input.password,
123 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
124 );
125 warn!("User not found for login attempt");
126 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
127 .into_response();
128 }
129 Err(e) => {
130 error!("Database error fetching user: {:?}", e);
131 return ApiError::InternalError.into_response();
132 }
133 };
134 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
135 Ok(k) => k,
136 Err(e) => {
137 error!("Failed to decrypt user key: {:?}", e);
138 return ApiError::InternalError.into_response();
139 }
140 };
141 let (password_valid, app_password_scopes, app_password_controller) = if row
142 .password_hash
143 .as_ref()
144 .map(|h| verify(&input.password, h).unwrap_or(false))
145 .unwrap_or(false)
146 {
147 (true, None, None)
148 } else {
149 let app_passwords = sqlx::query!(
150 "SELECT password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
151 row.id
152 )
153 .fetch_all(&state.db)
154 .await
155 .unwrap_or_default();
156 let matched = app_passwords
157 .iter()
158 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
159 match matched {
160 Some(app) => (
161 true,
162 app.scopes.clone(),
163 app.created_by_controller_did.clone(),
164 ),
165 None => (false, None, None),
166 }
167 };
168 if !password_valid {
169 warn!("Password verification failed for login attempt");
170 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
171 .into_response();
172 }
173 let is_takendown = row.takedown_ref.is_some();
174 if is_takendown && !input.allow_takendown {
175 warn!("Login attempt for takendown account: {}", row.did);
176 return (
177 StatusCode::UNAUTHORIZED,
178 Json(json!({
179 "error": "AccountTakedown",
180 "message": "Account has been taken down"
181 })),
182 )
183 .into_response();
184 }
185 let is_verified =
186 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
187 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did)
188 .await
189 .unwrap_or(false);
190 if !is_verified && !is_delegated {
191 warn!("Login attempt for unverified account: {}", row.did);
192 return (
193 StatusCode::FORBIDDEN,
194 Json(json!({
195 "error": "AccountNotVerified",
196 "message": "Please verify your account before logging in",
197 "did": row.did
198 })),
199 )
200 .into_response();
201 }
202 let has_totp = row.totp_enabled.unwrap_or(false);
203 let is_legacy_login = has_totp;
204 if has_totp && !row.allow_legacy_login {
205 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
206 return (
207 StatusCode::FORBIDDEN,
208 Json(json!({
209 "error": "MfaRequired",
210 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
211 "did": row.did
212 })),
213 )
214 .into_response();
215 }
216 let access_meta = match crate::auth::create_access_token_with_delegation(
217 &row.did,
218 &key_bytes,
219 app_password_scopes.as_deref(),
220 app_password_controller.as_deref(),
221 ) {
222 Ok(m) => m,
223 Err(e) => {
224 error!("Failed to create access token: {:?}", e);
225 return ApiError::InternalError.into_response();
226 }
227 };
228 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
229 Ok(m) => m,
230 Err(e) => {
231 error!("Failed to create refresh token: {:?}", e);
232 return ApiError::InternalError.into_response();
233 }
234 };
235 let did_for_doc = row.did.clone();
236 let did_resolver = state.did_resolver.clone();
237 let (insert_result, did_doc) = tokio::join!(
238 sqlx::query!(
239 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
240 row.did,
241 access_meta.jti,
242 refresh_meta.jti,
243 access_meta.expires_at,
244 refresh_meta.expires_at,
245 is_legacy_login,
246 false,
247 app_password_scopes,
248 app_password_controller
249 )
250 .execute(&state.db),
251 did_resolver.resolve_did_document(&did_for_doc)
252 );
253 if let Err(e) = insert_result {
254 error!("Failed to insert session: {:?}", e);
255 return ApiError::InternalError.into_response();
256 }
257 if is_legacy_login {
258 warn!(
259 did = %row.did,
260 ip = %client_ip,
261 "Legacy login on TOTP-enabled account - sending notification"
262 );
263 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
264 if let Err(e) = crate::comms::queue_legacy_login_notification(
265 &state.db,
266 row.id,
267 &hostname,
268 &client_ip,
269 row.preferred_comms_channel,
270 )
271 .await
272 {
273 error!("Failed to queue legacy login notification: {:?}", e);
274 }
275 }
276 let handle = full_handle(&row.handle, &pds_hostname);
277 let is_active = row.deactivated_at.is_none() && !is_takendown;
278 let status = if is_takendown {
279 Some("takendown".to_string())
280 } else if row.deactivated_at.is_some() {
281 Some("deactivated".to_string())
282 } else {
283 None
284 };
285 Json(CreateSessionOutput {
286 access_jwt: access_meta.token,
287 refresh_jwt: refresh_meta.token,
288 handle,
289 did: row.did,
290 did_doc,
291 email: row.email,
292 email_confirmed: Some(row.email_verified),
293 active: Some(is_active),
294 status,
295 })
296 .into_response()
297}
298
299pub async fn get_session(
300 State(state): State<AppState>,
301 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
302) -> Response {
303 let permissions = auth_user.permissions();
304 let can_read_email = permissions.allows_email_read();
305
306 let did_for_doc = auth_user.did.clone();
307 let did_resolver = state.did_resolver.clone();
308 let (db_result, did_doc) = tokio::join!(
309 sqlx::query!(
310 r#"SELECT
311 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale,
312 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
313 discord_verified, telegram_verified, signal_verified
314 FROM users WHERE did = $1"#,
315 auth_user.did
316 )
317 .fetch_optional(&state.db),
318 did_resolver.resolve_did_document(&did_for_doc)
319 );
320 match db_result {
321 Ok(Some(row)) => {
322 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
323 crate::comms::CommsChannel::Email => ("email", row.email_verified),
324 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
325 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
326 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
327 };
328 let pds_hostname =
329 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
330 let handle = full_handle(&row.handle, &pds_hostname);
331 let is_takendown = row.takedown_ref.is_some();
332 let is_active = row.deactivated_at.is_none() && !is_takendown;
333 let email_value = if can_read_email {
334 row.email.clone()
335 } else {
336 None
337 };
338 let email_confirmed_value = can_read_email && row.email_verified;
339 let mut response = json!({
340 "handle": handle,
341 "did": auth_user.did,
342 "active": is_active,
343 "preferredChannel": preferred_channel,
344 "preferredChannelVerified": preferred_channel_verified,
345 "preferredLocale": row.preferred_locale,
346 "isAdmin": row.is_admin
347 });
348 if can_read_email {
349 response["email"] = json!(email_value);
350 response["emailConfirmed"] = json!(email_confirmed_value);
351 }
352 if is_takendown {
353 response["status"] = json!("takendown");
354 } else if row.deactivated_at.is_some() {
355 response["status"] = json!("deactivated");
356 }
357 if let Some(doc) = did_doc {
358 response["didDoc"] = doc;
359 }
360 Json(response)
361 .into_response()
362 }
363 Ok(None) => ApiError::AuthenticationFailed.into_response(),
364 Err(e) => {
365 error!("Database error in get_session: {:?}", e);
366 ApiError::InternalError.into_response()
367 }
368 }
369}
370
371pub async fn delete_session(
372 State(state): State<AppState>,
373 headers: axum::http::HeaderMap,
374) -> Response {
375 let token = match crate::auth::extract_bearer_token_from_header(
376 headers.get("Authorization").and_then(|h| h.to_str().ok()),
377 ) {
378 Some(t) => t,
379 None => return ApiError::AuthenticationRequired.into_response(),
380 };
381 let jti = match crate::auth::get_jti_from_token(&token) {
382 Ok(jti) => jti,
383 Err(_) => return ApiError::AuthenticationFailed.into_response(),
384 };
385 let did = crate::auth::get_did_from_token(&token).ok();
386 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
387 .execute(&state.db)
388 .await
389 {
390 Ok(res) if res.rows_affected() > 0 => {
391 if let Some(did) = did {
392 let session_cache_key = format!("auth:session:{}:{}", did, jti);
393 let _ = state.cache.delete(&session_cache_key).await;
394 }
395 Json(json!({})).into_response()
396 }
397 Ok(_) => ApiError::AuthenticationFailed.into_response(),
398 Err(e) => {
399 error!("Database error in delete_session: {:?}", e);
400 ApiError::AuthenticationFailed.into_response()
401 }
402 }
403}
404
405pub async fn refresh_session(
406 State(state): State<AppState>,
407 headers: axum::http::HeaderMap,
408) -> Response {
409 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
410 if !state
411 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
412 .await
413 {
414 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
415 return (
416 axum::http::StatusCode::TOO_MANY_REQUESTS,
417 axum::Json(serde_json::json!({
418 "error": "RateLimitExceeded",
419 "message": "Too many requests. Please try again later."
420 })),
421 )
422 .into_response();
423 }
424 let refresh_token = match crate::auth::extract_bearer_token_from_header(
425 headers.get("Authorization").and_then(|h| h.to_str().ok()),
426 ) {
427 Some(t) => t,
428 None => return ApiError::AuthenticationRequired.into_response(),
429 };
430 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
431 Ok(jti) => jti,
432 Err(_) => {
433 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
434 .into_response();
435 }
436 };
437 let mut tx = match state.db.begin().await {
438 Ok(tx) => tx,
439 Err(e) => {
440 error!("Failed to begin transaction: {:?}", e);
441 return ApiError::InternalError.into_response();
442 }
443 };
444 if let Ok(Some(session_id)) = sqlx::query_scalar!(
445 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
446 refresh_jti
447 )
448 .fetch_optional(&mut *tx)
449 .await
450 {
451 warn!(
452 "Refresh token reuse detected! Revoking token family for session_id: {}",
453 session_id
454 );
455 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
456 .execute(&mut *tx)
457 .await;
458 let _ = tx.commit().await;
459 return ApiError::ExpiredTokenMsg(
460 "Refresh token has been revoked due to suspected compromise".into(),
461 )
462 .into_response();
463 }
464 let session_row = match sqlx::query!(
465 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version
466 FROM session_tokens st
467 JOIN users u ON st.did = u.did
468 JOIN user_keys k ON u.id = k.user_id
469 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
470 FOR UPDATE OF st"#,
471 refresh_jti
472 )
473 .fetch_optional(&mut *tx)
474 .await
475 {
476 Ok(Some(row)) => row,
477 Ok(None) => {
478 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
479 .into_response();
480 }
481 Err(e) => {
482 error!("Database error fetching session: {:?}", e);
483 return ApiError::InternalError.into_response();
484 }
485 };
486 let key_bytes =
487 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
488 Ok(k) => k,
489 Err(e) => {
490 error!("Failed to decrypt user key: {:?}", e);
491 return ApiError::InternalError.into_response();
492 }
493 };
494 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
495 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
496 }
497 let new_access_meta = match crate::auth::create_access_token_with_delegation(
498 &session_row.did,
499 &key_bytes,
500 session_row.scope.as_deref(),
501 session_row.controller_did.as_deref(),
502 ) {
503 Ok(m) => m,
504 Err(e) => {
505 error!("Failed to create access token: {:?}", e);
506 return ApiError::InternalError.into_response();
507 }
508 };
509 let new_refresh_meta =
510 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
511 Ok(m) => m,
512 Err(e) => {
513 error!("Failed to create refresh token: {:?}", e);
514 return ApiError::InternalError.into_response();
515 }
516 };
517 match sqlx::query!(
518 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
519 refresh_jti,
520 session_row.id
521 )
522 .execute(&mut *tx)
523 .await
524 {
525 Ok(result) if result.rows_affected() == 0 => {
526 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
527 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
528 .execute(&mut *tx)
529 .await;
530 let _ = tx.commit().await;
531 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
532 }
533 Err(e) => {
534 error!("Failed to record used refresh token: {:?}", e);
535 return ApiError::InternalError.into_response();
536 }
537 Ok(_) => {}
538 }
539 if let Err(e) = sqlx::query!(
540 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
541 new_access_meta.jti,
542 new_refresh_meta.jti,
543 new_access_meta.expires_at,
544 new_refresh_meta.expires_at,
545 session_row.id
546 )
547 .execute(&mut *tx)
548 .await
549 {
550 error!("Database error updating session: {:?}", e);
551 return ApiError::InternalError.into_response();
552 }
553 if let Err(e) = tx.commit().await {
554 error!("Failed to commit transaction: {:?}", e);
555 return ApiError::InternalError.into_response();
556 }
557 let did_for_doc = session_row.did.clone();
558 let did_resolver = state.did_resolver.clone();
559 let (db_result, did_doc) = tokio::join!(
560 sqlx::query!(
561 r#"SELECT
562 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref,
563 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
564 discord_verified, telegram_verified, signal_verified
565 FROM users WHERE did = $1"#,
566 session_row.did
567 )
568 .fetch_optional(&state.db),
569 did_resolver.resolve_did_document(&did_for_doc)
570 );
571 match db_result {
572 Ok(Some(u)) => {
573 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
574 crate::comms::CommsChannel::Email => ("email", u.email_verified),
575 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
576 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
577 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
578 };
579 let pds_hostname =
580 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
581 let handle = full_handle(&u.handle, &pds_hostname);
582 let is_takendown = u.takedown_ref.is_some();
583 let is_active = u.deactivated_at.is_none() && !is_takendown;
584 let mut response = json!({
585 "accessJwt": new_access_meta.token,
586 "refreshJwt": new_refresh_meta.token,
587 "handle": handle,
588 "did": session_row.did,
589 "email": u.email,
590 "emailConfirmed": u.email_verified,
591 "preferredChannel": preferred_channel,
592 "preferredChannelVerified": preferred_channel_verified,
593 "preferredLocale": u.preferred_locale,
594 "isAdmin": u.is_admin,
595 "active": is_active
596 });
597 if let Some(doc) = did_doc {
598 response["didDoc"] = doc;
599 }
600 if is_takendown {
601 response["status"] = json!("takendown");
602 } else if u.deactivated_at.is_some() {
603 response["status"] = json!("deactivated");
604 }
605 Json(response)
606 .into_response()
607 }
608 Ok(None) => {
609 error!("User not found for existing session: {}", session_row.did);
610 ApiError::InternalError.into_response()
611 }
612 Err(e) => {
613 error!("Database error fetching user: {:?}", e);
614 ApiError::InternalError.into_response()
615 }
616 }
617}
618
619#[derive(Deserialize)]
620#[serde(rename_all = "camelCase")]
621pub struct ConfirmSignupInput {
622 pub did: String,
623 pub verification_code: String,
624}
625
626#[derive(Serialize)]
627#[serde(rename_all = "camelCase")]
628pub struct ConfirmSignupOutput {
629 pub access_jwt: String,
630 pub refresh_jwt: String,
631 pub handle: String,
632 pub did: String,
633 pub email: Option<String>,
634 pub email_verified: bool,
635 pub preferred_channel: String,
636 pub preferred_channel_verified: bool,
637}
638
639pub async fn confirm_signup(
640 State(state): State<AppState>,
641 Json(input): Json<ConfirmSignupInput>,
642) -> Response {
643 info!("confirm_signup called for DID: {}", input.did);
644 let row = match sqlx::query!(
645 r#"SELECT
646 u.id, u.did, u.handle, u.email,
647 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
648 u.discord_id, u.telegram_username, u.signal_number,
649 k.key_bytes, k.encryption_version
650 FROM users u
651 JOIN user_keys k ON u.id = k.user_id
652 WHERE u.did = $1"#,
653 input.did
654 )
655 .fetch_optional(&state.db)
656 .await
657 {
658 Ok(Some(row)) => row,
659 Ok(None) => {
660 warn!("User not found for confirm_signup: {}", input.did);
661 return ApiError::InvalidRequest("Invalid DID or verification code".into())
662 .into_response();
663 }
664 Err(e) => {
665 error!("Database error in confirm_signup: {:?}", e);
666 return ApiError::InternalError.into_response();
667 }
668 };
669
670 let (channel_str, identifier) = match row.channel {
671 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
672 crate::comms::CommsChannel::Discord => {
673 ("discord", row.discord_id.clone().unwrap_or_default())
674 }
675 crate::comms::CommsChannel::Telegram => (
676 "telegram",
677 row.telegram_username.clone().unwrap_or_default(),
678 ),
679 crate::comms::CommsChannel::Signal => {
680 ("signal", row.signal_number.clone().unwrap_or_default())
681 }
682 };
683
684 let normalized_token =
685 crate::auth::verification_token::normalize_token_input(&input.verification_code);
686 match crate::auth::verification_token::verify_signup_token(
687 &normalized_token,
688 channel_str,
689 &identifier,
690 ) {
691 Ok(token_data) => {
692 if token_data.did != input.did {
693 warn!(
694 "Token DID mismatch for confirm_signup: expected {}, got {}",
695 input.did, token_data.did
696 );
697 return ApiError::InvalidRequest("Invalid verification code".into())
698 .into_response();
699 }
700 }
701 Err(crate::auth::verification_token::VerifyError::Expired) => {
702 warn!("Verification code expired for user: {}", input.did);
703 return ApiError::ExpiredTokenMsg("Verification code has expired".into())
704 .into_response();
705 }
706 Err(e) => {
707 warn!("Invalid verification code for user {}: {:?}", input.did, e);
708 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
709 }
710 }
711
712 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
713 Ok(k) => k,
714 Err(e) => {
715 error!("Failed to decrypt user key: {:?}", e);
716 return ApiError::InternalError.into_response();
717 }
718 };
719 let verified_column = match row.channel {
720 crate::comms::CommsChannel::Email => "email_verified",
721 crate::comms::CommsChannel::Discord => "discord_verified",
722 crate::comms::CommsChannel::Telegram => "telegram_verified",
723 crate::comms::CommsChannel::Signal => "signal_verified",
724 };
725 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
726 if let Err(e) = sqlx::query(&update_query)
727 .bind(&input.did)
728 .execute(&state.db)
729 .await
730 {
731 error!("Failed to update verification status: {:?}", e);
732 return ApiError::InternalError.into_response();
733 }
734
735 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
736 Ok(m) => m,
737 Err(e) => {
738 error!("Failed to create access token: {:?}", e);
739 return ApiError::InternalError.into_response();
740 }
741 };
742 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
743 Ok(m) => m,
744 Err(e) => {
745 error!("Failed to create refresh token: {:?}", e);
746 return ApiError::InternalError.into_response();
747 }
748 };
749 let no_scope: Option<String> = None;
750 if let Err(e) = sqlx::query!(
751 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
752 row.did,
753 access_meta.jti,
754 refresh_meta.jti,
755 access_meta.expires_at,
756 refresh_meta.expires_at,
757 false,
758 false,
759 no_scope
760 )
761 .execute(&state.db)
762 .await
763 {
764 error!("Failed to insert session: {:?}", e);
765 return ApiError::InternalError.into_response();
766 }
767 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
768 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
769 warn!("Failed to enqueue welcome notification: {:?}", e);
770 }
771 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
772 let preferred_channel = match row.channel {
773 crate::comms::CommsChannel::Email => "email",
774 crate::comms::CommsChannel::Discord => "discord",
775 crate::comms::CommsChannel::Telegram => "telegram",
776 crate::comms::CommsChannel::Signal => "signal",
777 };
778 Json(ConfirmSignupOutput {
779 access_jwt: access_meta.token,
780 refresh_jwt: refresh_meta.token,
781 handle: row.handle,
782 did: row.did,
783 email: row.email,
784 email_verified,
785 preferred_channel: preferred_channel.to_string(),
786 preferred_channel_verified: true,
787 })
788 .into_response()
789}
790
791#[derive(Deserialize)]
792#[serde(rename_all = "camelCase")]
793pub struct ResendVerificationInput {
794 pub did: String,
795}
796
797pub async fn resend_verification(
798 State(state): State<AppState>,
799 Json(input): Json<ResendVerificationInput>,
800) -> Response {
801 info!("resend_verification called for DID: {}", input.did);
802 let row = match sqlx::query!(
803 r#"SELECT
804 id, handle, email,
805 preferred_comms_channel as "channel: crate::comms::CommsChannel",
806 discord_id, telegram_username, signal_number,
807 email_verified, discord_verified, telegram_verified, signal_verified
808 FROM users
809 WHERE did = $1"#,
810 input.did
811 )
812 .fetch_optional(&state.db)
813 .await
814 {
815 Ok(Some(row)) => row,
816 Ok(None) => {
817 return ApiError::InvalidRequest("User not found".into()).into_response();
818 }
819 Err(e) => {
820 error!("Database error in resend_verification: {:?}", e);
821 return ApiError::InternalError.into_response();
822 }
823 };
824 let is_verified =
825 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
826 if is_verified {
827 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
828 }
829
830 let (channel_str, recipient) = match row.channel {
831 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
832 crate::comms::CommsChannel::Discord => {
833 ("discord", row.discord_id.clone().unwrap_or_default())
834 }
835 crate::comms::CommsChannel::Telegram => (
836 "telegram",
837 row.telegram_username.clone().unwrap_or_default(),
838 ),
839 crate::comms::CommsChannel::Signal => {
840 ("signal", row.signal_number.clone().unwrap_or_default())
841 }
842 };
843
844 let verification_token =
845 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
846 let formatted_token =
847 crate::auth::verification_token::format_token_for_display(&verification_token);
848
849 if let Err(e) = crate::comms::enqueue_signup_verification(
850 &state.db,
851 row.id,
852 channel_str,
853 &recipient,
854 &formatted_token,
855 None,
856 )
857 .await
858 {
859 warn!("Failed to enqueue verification notification: {:?}", e);
860 }
861 Json(json!({"success": true})).into_response()
862}
863
864#[derive(Serialize)]
865#[serde(rename_all = "camelCase")]
866pub struct SessionInfo {
867 pub id: String,
868 pub session_type: String,
869 pub client_name: Option<String>,
870 pub created_at: String,
871 pub expires_at: String,
872 pub is_current: bool,
873}
874
875#[derive(Serialize)]
876#[serde(rename_all = "camelCase")]
877pub struct ListSessionsOutput {
878 pub sessions: Vec<SessionInfo>,
879}
880
881pub async fn list_sessions(
882 State(state): State<AppState>,
883 headers: HeaderMap,
884 auth: BearerAuth,
885) -> Response {
886 let current_jti = headers
887 .get("authorization")
888 .and_then(|v| v.to_str().ok())
889 .and_then(|v| v.strip_prefix("Bearer "))
890 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
891
892 let mut sessions: Vec<SessionInfo> = Vec::new();
893
894 let jwt_result = sqlx::query_as::<
895 _,
896 (
897 i32,
898 String,
899 chrono::DateTime<chrono::Utc>,
900 chrono::DateTime<chrono::Utc>,
901 ),
902 >(
903 r#"
904 SELECT id, access_jti, created_at, refresh_expires_at
905 FROM session_tokens
906 WHERE did = $1 AND refresh_expires_at > NOW()
907 ORDER BY created_at DESC
908 "#,
909 )
910 .bind(&auth.0.did)
911 .fetch_all(&state.db)
912 .await;
913
914 match jwt_result {
915 Ok(rows) => {
916 for (id, access_jti, created_at, expires_at) in rows {
917 sessions.push(SessionInfo {
918 id: format!("jwt:{}", id),
919 session_type: "legacy".to_string(),
920 client_name: None,
921 created_at: created_at.to_rfc3339(),
922 expires_at: expires_at.to_rfc3339(),
923 is_current: current_jti.as_ref() == Some(&access_jti),
924 });
925 }
926 }
927 Err(e) => {
928 error!("DB error fetching JWT sessions: {:?}", e);
929 return (
930 StatusCode::INTERNAL_SERVER_ERROR,
931 Json(json!({"error": "InternalError"})),
932 )
933 .into_response();
934 }
935 }
936
937 let oauth_result = sqlx::query_as::<
938 _,
939 (
940 i32,
941 String,
942 chrono::DateTime<chrono::Utc>,
943 chrono::DateTime<chrono::Utc>,
944 String,
945 ),
946 >(
947 r#"
948 SELECT id, token_id, created_at, expires_at, client_id
949 FROM oauth_token
950 WHERE did = $1 AND expires_at > NOW()
951 ORDER BY created_at DESC
952 "#,
953 )
954 .bind(&auth.0.did)
955 .fetch_all(&state.db)
956 .await;
957
958 match oauth_result {
959 Ok(rows) => {
960 for (id, token_id, created_at, expires_at, client_id) in rows {
961 let client_name = extract_client_name(&client_id);
962 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id);
963 sessions.push(SessionInfo {
964 id: format!("oauth:{}", id),
965 session_type: "oauth".to_string(),
966 client_name: Some(client_name),
967 created_at: created_at.to_rfc3339(),
968 expires_at: expires_at.to_rfc3339(),
969 is_current: is_current_oauth,
970 });
971 }
972 }
973 Err(e) => {
974 error!("DB error fetching OAuth sessions: {:?}", e);
975 return (
976 StatusCode::INTERNAL_SERVER_ERROR,
977 Json(json!({"error": "InternalError"})),
978 )
979 .into_response();
980 }
981 }
982
983 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
984
985 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
986}
987
988fn extract_client_name(client_id: &str) -> String {
989 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
990 "Localhost App".to_string()
991 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
992 parsed.host_str().unwrap_or("Unknown App").to_string()
993 } else {
994 client_id.to_string()
995 }
996}
997
998#[derive(Deserialize)]
999#[serde(rename_all = "camelCase")]
1000pub struct RevokeSessionInput {
1001 pub session_id: String,
1002}
1003
1004pub async fn revoke_session(
1005 State(state): State<AppState>,
1006 auth: BearerAuth,
1007 Json(input): Json<RevokeSessionInput>,
1008) -> Response {
1009 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
1010 let session_id: i32 = match jwt_id.parse() {
1011 Ok(id) => id,
1012 Err(_) => {
1013 return (
1014 StatusCode::BAD_REQUEST,
1015 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
1016 )
1017 .into_response();
1018 }
1019 };
1020 let session = sqlx::query_as::<_, (String,)>(
1021 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
1022 )
1023 .bind(session_id)
1024 .bind(&auth.0.did)
1025 .fetch_optional(&state.db)
1026 .await;
1027 let access_jti = match session {
1028 Ok(Some((jti,))) => jti,
1029 Ok(None) => {
1030 return (
1031 StatusCode::NOT_FOUND,
1032 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1033 )
1034 .into_response();
1035 }
1036 Err(e) => {
1037 error!("DB error in revoke_session: {:?}", e);
1038 return (
1039 StatusCode::INTERNAL_SERVER_ERROR,
1040 Json(json!({"error": "InternalError"})),
1041 )
1042 .into_response();
1043 }
1044 };
1045 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
1046 .bind(session_id)
1047 .execute(&state.db)
1048 .await
1049 {
1050 error!("DB error deleting session: {:?}", e);
1051 return (
1052 StatusCode::INTERNAL_SERVER_ERROR,
1053 Json(json!({"error": "InternalError"})),
1054 )
1055 .into_response();
1056 }
1057 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
1058 if let Err(e) = state.cache.delete(&cache_key).await {
1059 warn!("Failed to invalidate session cache: {:?}", e);
1060 }
1061 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
1062 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
1063 let session_id: i32 = match oauth_id.parse() {
1064 Ok(id) => id,
1065 Err(_) => {
1066 return (
1067 StatusCode::BAD_REQUEST,
1068 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
1069 )
1070 .into_response();
1071 }
1072 };
1073 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
1074 .bind(session_id)
1075 .bind(&auth.0.did)
1076 .execute(&state.db)
1077 .await;
1078 match result {
1079 Ok(r) if r.rows_affected() == 0 => {
1080 return (
1081 StatusCode::NOT_FOUND,
1082 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1083 )
1084 .into_response();
1085 }
1086 Err(e) => {
1087 error!("DB error deleting OAuth session: {:?}", e);
1088 return (
1089 StatusCode::INTERNAL_SERVER_ERROR,
1090 Json(json!({"error": "InternalError"})),
1091 )
1092 .into_response();
1093 }
1094 _ => {}
1095 }
1096 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1097 } else {
1098 return (
1099 StatusCode::BAD_REQUEST,
1100 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1101 )
1102 .into_response();
1103 }
1104 (StatusCode::OK, Json(json!({}))).into_response()
1105}
1106
1107pub async fn revoke_all_sessions(
1108 State(state): State<AppState>,
1109 headers: HeaderMap,
1110 auth: BearerAuth,
1111) -> Response {
1112 let current_jti = headers
1113 .get("authorization")
1114 .and_then(|v| v.to_str().ok())
1115 .and_then(|v| v.strip_prefix("Bearer "))
1116 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1117
1118 if let Some(ref jti) = current_jti {
1119 if auth.0.is_oauth {
1120 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1121 .bind(&auth.0.did)
1122 .execute(&state.db)
1123 .await
1124 {
1125 error!("DB error revoking JWT sessions: {:?}", e);
1126 return (
1127 StatusCode::INTERNAL_SERVER_ERROR,
1128 Json(json!({"error": "InternalError"})),
1129 )
1130 .into_response();
1131 }
1132 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1133 .bind(&auth.0.did)
1134 .bind(jti)
1135 .execute(&state.db)
1136 .await
1137 {
1138 error!("DB error revoking OAuth sessions: {:?}", e);
1139 return (
1140 StatusCode::INTERNAL_SERVER_ERROR,
1141 Json(json!({"error": "InternalError"})),
1142 )
1143 .into_response();
1144 }
1145 } else {
1146 if let Err(e) =
1147 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1148 .bind(&auth.0.did)
1149 .bind(jti)
1150 .execute(&state.db)
1151 .await
1152 {
1153 error!("DB error revoking JWT sessions: {:?}", e);
1154 return (
1155 StatusCode::INTERNAL_SERVER_ERROR,
1156 Json(json!({"error": "InternalError"})),
1157 )
1158 .into_response();
1159 }
1160 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1161 .bind(&auth.0.did)
1162 .execute(&state.db)
1163 .await
1164 {
1165 error!("DB error revoking OAuth sessions: {:?}", e);
1166 return (
1167 StatusCode::INTERNAL_SERVER_ERROR,
1168 Json(json!({"error": "InternalError"})),
1169 )
1170 .into_response();
1171 }
1172 }
1173 } else {
1174 return (
1175 StatusCode::BAD_REQUEST,
1176 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1177 )
1178 .into_response();
1179 }
1180
1181 info!(did = %auth.0.did, "All other sessions revoked");
1182 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1183}
1184
1185#[derive(Serialize)]
1186#[serde(rename_all = "camelCase")]
1187pub struct LegacyLoginPreferenceOutput {
1188 pub allow_legacy_login: bool,
1189 pub has_mfa: bool,
1190}
1191
1192pub async fn get_legacy_login_preference(
1193 State(state): State<AppState>,
1194 auth: BearerAuth,
1195) -> Response {
1196 let result = sqlx::query!(
1197 r#"SELECT
1198 u.allow_legacy_login,
1199 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1200 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1201 FROM users u WHERE u.did = $1"#,
1202 auth.0.did
1203 )
1204 .fetch_optional(&state.db)
1205 .await;
1206
1207 match result {
1208 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1209 allow_legacy_login: row.allow_legacy_login,
1210 has_mfa: row.has_mfa,
1211 })
1212 .into_response(),
1213 Ok(None) => (
1214 StatusCode::NOT_FOUND,
1215 Json(json!({"error": "AccountNotFound"})),
1216 )
1217 .into_response(),
1218 Err(e) => {
1219 error!("DB error: {:?}", e);
1220 (
1221 StatusCode::INTERNAL_SERVER_ERROR,
1222 Json(json!({"error": "InternalError"})),
1223 )
1224 .into_response()
1225 }
1226 }
1227}
1228
1229#[derive(Deserialize)]
1230#[serde(rename_all = "camelCase")]
1231pub struct UpdateLegacyLoginInput {
1232 pub allow_legacy_login: bool,
1233}
1234
1235pub async fn update_legacy_login_preference(
1236 State(state): State<AppState>,
1237 auth: BearerAuth,
1238 Json(input): Json<UpdateLegacyLoginInput>,
1239) -> Response {
1240 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1241 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1242 .await;
1243 }
1244
1245 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1246 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1247 }
1248
1249 let result = sqlx::query!(
1250 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1251 input.allow_legacy_login,
1252 auth.0.did
1253 )
1254 .fetch_optional(&state.db)
1255 .await;
1256
1257 match result {
1258 Ok(Some(_)) => {
1259 info!(
1260 did = %auth.0.did,
1261 allow_legacy_login = input.allow_legacy_login,
1262 "Legacy login preference updated"
1263 );
1264 Json(json!({
1265 "allowLegacyLogin": input.allow_legacy_login
1266 }))
1267 .into_response()
1268 }
1269 Ok(None) => (
1270 StatusCode::NOT_FOUND,
1271 Json(json!({"error": "AccountNotFound"})),
1272 )
1273 .into_response(),
1274 Err(e) => {
1275 error!("DB error: {:?}", e);
1276 (
1277 StatusCode::INTERNAL_SERVER_ERROR,
1278 Json(json!({"error": "InternalError"})),
1279 )
1280 .into_response()
1281 }
1282 }
1283}
1284
1285use crate::comms::locale::VALID_LOCALES;
1286
1287#[derive(Deserialize)]
1288#[serde(rename_all = "camelCase")]
1289pub struct UpdateLocaleInput {
1290 pub preferred_locale: String,
1291}
1292
1293pub async fn update_locale(
1294 State(state): State<AppState>,
1295 auth: BearerAuth,
1296 Json(input): Json<UpdateLocaleInput>,
1297) -> Response {
1298 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1299 return (
1300 StatusCode::BAD_REQUEST,
1301 Json(json!({
1302 "error": "InvalidRequest",
1303 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", "))
1304 })),
1305 )
1306 .into_response();
1307 }
1308
1309 let result = sqlx::query!(
1310 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1311 input.preferred_locale,
1312 auth.0.did
1313 )
1314 .fetch_optional(&state.db)
1315 .await;
1316
1317 match result {
1318 Ok(Some(_)) => {
1319 info!(
1320 did = %auth.0.did,
1321 locale = %input.preferred_locale,
1322 "User locale preference updated"
1323 );
1324 Json(json!({
1325 "preferredLocale": input.preferred_locale
1326 }))
1327 .into_response()
1328 }
1329 Ok(None) => (
1330 StatusCode::NOT_FOUND,
1331 Json(json!({"error": "AccountNotFound"})),
1332 )
1333 .into_response(),
1334 Err(e) => {
1335 error!("DB error updating locale: {:?}", e);
1336 (
1337 StatusCode::INTERNAL_SERVER_ERROR,
1338 Json(json!({"error": "InternalError"})),
1339 )
1340 .into_response()
1341 }
1342 }
1343}