this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14
15fn extract_client_ip(headers: &HeaderMap) -> String {
16 if let Some(forwarded) = headers.get("x-forwarded-for")
17 && let Ok(value) = forwarded.to_str()
18 && let Some(first_ip) = value.split(',').next()
19 {
20 return first_ip.trim().to_string();
21 }
22 if let Some(real_ip) = headers.get("x-real-ip")
23 && let Ok(value) = real_ip.to_str()
24 {
25 return value.trim().to_string();
26 }
27 "unknown".to_string()
28}
29
30fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
31 let identifier = identifier.trim();
32 if identifier.contains('@') || identifier.starts_with("did:") {
33 identifier.to_string()
34 } else if !identifier.contains('.') {
35 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
36 } else {
37 identifier.to_lowercase()
38 }
39}
40
41fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
42 stored_handle.to_string()
43}
44
45#[derive(Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct CreateSessionInput {
48 pub identifier: String,
49 pub password: String,
50 #[serde(default)]
51 pub allow_takendown: bool,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateSessionOutput {
57 pub access_jwt: String,
58 pub refresh_jwt: String,
59 pub handle: String,
60 pub did: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub did_doc: Option<serde_json::Value>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub email: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub email_confirmed: Option<bool>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub active: Option<bool>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub status: Option<String>,
71}
72
73pub async fn create_session(
74 State(state): State<AppState>,
75 headers: HeaderMap,
76 Json(input): Json<CreateSessionInput>,
77) -> Response {
78 info!(
79 "create_session called with identifier: {}",
80 input.identifier
81 );
82 let client_ip = extract_client_ip(&headers);
83 if !state
84 .check_rate_limit(RateLimitKind::Login, &client_ip)
85 .await
86 {
87 warn!(ip = %client_ip, "Login rate limit exceeded");
88 return (
89 StatusCode::TOO_MANY_REQUESTS,
90 Json(json!({
91 "error": "RateLimitExceeded",
92 "message": "Too many login attempts. Please try again later."
93 })),
94 )
95 .into_response();
96 }
97 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
98 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
99 info!(
100 "Normalized identifier: {} -> {}",
101 input.identifier, normalized_identifier
102 );
103 let row = match sqlx::query!(
104 r#"SELECT
105 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref,
106 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
107 u.allow_legacy_login,
108 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
109 k.key_bytes, k.encryption_version,
110 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
111 FROM users u
112 JOIN user_keys k ON u.id = k.user_id
113 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
114 normalized_identifier
115 )
116 .fetch_optional(&state.db)
117 .await
118 {
119 Ok(Some(row)) => row,
120 Ok(None) => {
121 let _ = verify(
122 &input.password,
123 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
124 );
125 warn!("User not found for login attempt");
126 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
127 .into_response();
128 }
129 Err(e) => {
130 error!("Database error fetching user: {:?}", e);
131 return ApiError::InternalError.into_response();
132 }
133 };
134 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
135 Ok(k) => k,
136 Err(e) => {
137 error!("Failed to decrypt user key: {:?}", e);
138 return ApiError::InternalError.into_response();
139 }
140 };
141 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row
142 .password_hash
143 .as_ref()
144 .map(|h| verify(&input.password, h).unwrap_or(false))
145 .unwrap_or(false)
146 {
147 (true, None, None, None)
148 } else {
149 let app_passwords = sqlx::query!(
150 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
151 row.id
152 )
153 .fetch_all(&state.db)
154 .await
155 .unwrap_or_default();
156 let matched = app_passwords
157 .iter()
158 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
159 match matched {
160 Some(app) => (
161 true,
162 Some(app.name.clone()),
163 app.scopes.clone(),
164 app.created_by_controller_did.clone(),
165 ),
166 None => (false, None, None, None),
167 }
168 };
169 if !password_valid {
170 warn!("Password verification failed for login attempt");
171 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
172 .into_response();
173 }
174 let is_takendown = row.takedown_ref.is_some();
175 if is_takendown && !input.allow_takendown {
176 warn!("Login attempt for takendown account: {}", row.did);
177 return (
178 StatusCode::UNAUTHORIZED,
179 Json(json!({
180 "error": "AccountTakedown",
181 "message": "Account has been taken down"
182 })),
183 )
184 .into_response();
185 }
186 let is_verified =
187 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
188 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did)
189 .await
190 .unwrap_or(false);
191 if !is_verified && !is_delegated {
192 warn!("Login attempt for unverified account: {}", row.did);
193 return (
194 StatusCode::FORBIDDEN,
195 Json(json!({
196 "error": "AccountNotVerified",
197 "message": "Please verify your account before logging in",
198 "did": row.did
199 })),
200 )
201 .into_response();
202 }
203 let has_totp = row.totp_enabled.unwrap_or(false);
204 let is_legacy_login = has_totp;
205 if has_totp && !row.allow_legacy_login {
206 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
207 return (
208 StatusCode::FORBIDDEN,
209 Json(json!({
210 "error": "MfaRequired",
211 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
212 "did": row.did
213 })),
214 )
215 .into_response();
216 }
217 let access_meta = match crate::auth::create_access_token_with_delegation(
218 &row.did,
219 &key_bytes,
220 app_password_scopes.as_deref(),
221 app_password_controller.as_deref(),
222 ) {
223 Ok(m) => m,
224 Err(e) => {
225 error!("Failed to create access token: {:?}", e);
226 return ApiError::InternalError.into_response();
227 }
228 };
229 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
230 Ok(m) => m,
231 Err(e) => {
232 error!("Failed to create refresh token: {:?}", e);
233 return ApiError::InternalError.into_response();
234 }
235 };
236 let did_for_doc = row.did.clone();
237 let did_resolver = state.did_resolver.clone();
238 let (insert_result, did_doc) = tokio::join!(
239 sqlx::query!(
240 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
241 row.did,
242 access_meta.jti,
243 refresh_meta.jti,
244 access_meta.expires_at,
245 refresh_meta.expires_at,
246 is_legacy_login,
247 false,
248 app_password_scopes,
249 app_password_controller,
250 app_password_name
251 )
252 .execute(&state.db),
253 did_resolver.resolve_did_document(&did_for_doc)
254 );
255 if let Err(e) = insert_result {
256 error!("Failed to insert session: {:?}", e);
257 return ApiError::InternalError.into_response();
258 }
259 if is_legacy_login {
260 warn!(
261 did = %row.did,
262 ip = %client_ip,
263 "Legacy login on TOTP-enabled account - sending notification"
264 );
265 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
266 if let Err(e) = crate::comms::queue_legacy_login_notification(
267 &state.db,
268 row.id,
269 &hostname,
270 &client_ip,
271 row.preferred_comms_channel,
272 )
273 .await
274 {
275 error!("Failed to queue legacy login notification: {:?}", e);
276 }
277 }
278 let handle = full_handle(&row.handle, &pds_hostname);
279 let is_active = row.deactivated_at.is_none() && !is_takendown;
280 let status = if is_takendown {
281 Some("takendown".to_string())
282 } else if row.deactivated_at.is_some() {
283 Some("deactivated".to_string())
284 } else {
285 None
286 };
287 Json(CreateSessionOutput {
288 access_jwt: access_meta.token,
289 refresh_jwt: refresh_meta.token,
290 handle,
291 did: row.did,
292 did_doc,
293 email: row.email,
294 email_confirmed: Some(row.email_verified),
295 active: Some(is_active),
296 status,
297 })
298 .into_response()
299}
300
301pub async fn get_session(
302 State(state): State<AppState>,
303 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
304) -> Response {
305 let permissions = auth_user.permissions();
306 let can_read_email = permissions.allows_email_read();
307
308 let did_for_doc = auth_user.did.clone();
309 let did_resolver = state.did_resolver.clone();
310 let (db_result, did_doc) = tokio::join!(
311 sqlx::query!(
312 r#"SELECT
313 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale,
314 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
315 discord_verified, telegram_verified, signal_verified
316 FROM users WHERE did = $1"#,
317 auth_user.did
318 )
319 .fetch_optional(&state.db),
320 did_resolver.resolve_did_document(&did_for_doc)
321 );
322 match db_result {
323 Ok(Some(row)) => {
324 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
325 crate::comms::CommsChannel::Email => ("email", row.email_verified),
326 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
327 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
328 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
329 };
330 let pds_hostname =
331 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
332 let handle = full_handle(&row.handle, &pds_hostname);
333 let is_takendown = row.takedown_ref.is_some();
334 let is_active = row.deactivated_at.is_none() && !is_takendown;
335 let email_value = if can_read_email {
336 row.email.clone()
337 } else {
338 None
339 };
340 let email_confirmed_value = can_read_email && row.email_verified;
341 let mut response = json!({
342 "handle": handle,
343 "did": auth_user.did,
344 "active": is_active,
345 "preferredChannel": preferred_channel,
346 "preferredChannelVerified": preferred_channel_verified,
347 "preferredLocale": row.preferred_locale,
348 "isAdmin": row.is_admin
349 });
350 if can_read_email {
351 response["email"] = json!(email_value);
352 response["emailConfirmed"] = json!(email_confirmed_value);
353 }
354 if is_takendown {
355 response["status"] = json!("takendown");
356 } else if row.deactivated_at.is_some() {
357 response["status"] = json!("deactivated");
358 }
359 if let Some(doc) = did_doc {
360 response["didDoc"] = doc;
361 }
362 Json(response)
363 .into_response()
364 }
365 Ok(None) => ApiError::AuthenticationFailed.into_response(),
366 Err(e) => {
367 error!("Database error in get_session: {:?}", e);
368 ApiError::InternalError.into_response()
369 }
370 }
371}
372
373pub async fn delete_session(
374 State(state): State<AppState>,
375 headers: axum::http::HeaderMap,
376) -> Response {
377 let token = match crate::auth::extract_bearer_token_from_header(
378 headers.get("Authorization").and_then(|h| h.to_str().ok()),
379 ) {
380 Some(t) => t,
381 None => return ApiError::AuthenticationRequired.into_response(),
382 };
383 let jti = match crate::auth::get_jti_from_token(&token) {
384 Ok(jti) => jti,
385 Err(_) => return ApiError::AuthenticationFailed.into_response(),
386 };
387 let did = crate::auth::get_did_from_token(&token).ok();
388 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
389 .execute(&state.db)
390 .await
391 {
392 Ok(res) if res.rows_affected() > 0 => {
393 if let Some(did) = did {
394 let session_cache_key = format!("auth:session:{}:{}", did, jti);
395 let _ = state.cache.delete(&session_cache_key).await;
396 }
397 Json(json!({})).into_response()
398 }
399 Ok(_) => ApiError::AuthenticationFailed.into_response(),
400 Err(e) => {
401 error!("Database error in delete_session: {:?}", e);
402 ApiError::AuthenticationFailed.into_response()
403 }
404 }
405}
406
407pub async fn refresh_session(
408 State(state): State<AppState>,
409 headers: axum::http::HeaderMap,
410) -> Response {
411 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
412 if !state
413 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
414 .await
415 {
416 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
417 return (
418 axum::http::StatusCode::TOO_MANY_REQUESTS,
419 axum::Json(serde_json::json!({
420 "error": "RateLimitExceeded",
421 "message": "Too many requests. Please try again later."
422 })),
423 )
424 .into_response();
425 }
426 let refresh_token = match crate::auth::extract_bearer_token_from_header(
427 headers.get("Authorization").and_then(|h| h.to_str().ok()),
428 ) {
429 Some(t) => t,
430 None => return ApiError::AuthenticationRequired.into_response(),
431 };
432 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
433 Ok(jti) => jti,
434 Err(_) => {
435 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
436 .into_response();
437 }
438 };
439 let mut tx = match state.db.begin().await {
440 Ok(tx) => tx,
441 Err(e) => {
442 error!("Failed to begin transaction: {:?}", e);
443 return ApiError::InternalError.into_response();
444 }
445 };
446 if let Ok(Some(session_id)) = sqlx::query_scalar!(
447 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
448 refresh_jti
449 )
450 .fetch_optional(&mut *tx)
451 .await
452 {
453 warn!(
454 "Refresh token reuse detected! Revoking token family for session_id: {}",
455 session_id
456 );
457 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
458 .execute(&mut *tx)
459 .await;
460 let _ = tx.commit().await;
461 return ApiError::ExpiredTokenMsg(
462 "Refresh token has been revoked due to suspected compromise".into(),
463 )
464 .into_response();
465 }
466 let session_row = match sqlx::query!(
467 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version
468 FROM session_tokens st
469 JOIN users u ON st.did = u.did
470 JOIN user_keys k ON u.id = k.user_id
471 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
472 FOR UPDATE OF st"#,
473 refresh_jti
474 )
475 .fetch_optional(&mut *tx)
476 .await
477 {
478 Ok(Some(row)) => row,
479 Ok(None) => {
480 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
481 .into_response();
482 }
483 Err(e) => {
484 error!("Database error fetching session: {:?}", e);
485 return ApiError::InternalError.into_response();
486 }
487 };
488 let key_bytes =
489 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
490 Ok(k) => k,
491 Err(e) => {
492 error!("Failed to decrypt user key: {:?}", e);
493 return ApiError::InternalError.into_response();
494 }
495 };
496 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
497 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
498 }
499 let new_access_meta = match crate::auth::create_access_token_with_delegation(
500 &session_row.did,
501 &key_bytes,
502 session_row.scope.as_deref(),
503 session_row.controller_did.as_deref(),
504 ) {
505 Ok(m) => m,
506 Err(e) => {
507 error!("Failed to create access token: {:?}", e);
508 return ApiError::InternalError.into_response();
509 }
510 };
511 let new_refresh_meta =
512 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
513 Ok(m) => m,
514 Err(e) => {
515 error!("Failed to create refresh token: {:?}", e);
516 return ApiError::InternalError.into_response();
517 }
518 };
519 match sqlx::query!(
520 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
521 refresh_jti,
522 session_row.id
523 )
524 .execute(&mut *tx)
525 .await
526 {
527 Ok(result) if result.rows_affected() == 0 => {
528 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
529 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
530 .execute(&mut *tx)
531 .await;
532 let _ = tx.commit().await;
533 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
534 }
535 Err(e) => {
536 error!("Failed to record used refresh token: {:?}", e);
537 return ApiError::InternalError.into_response();
538 }
539 Ok(_) => {}
540 }
541 if let Err(e) = sqlx::query!(
542 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
543 new_access_meta.jti,
544 new_refresh_meta.jti,
545 new_access_meta.expires_at,
546 new_refresh_meta.expires_at,
547 session_row.id
548 )
549 .execute(&mut *tx)
550 .await
551 {
552 error!("Database error updating session: {:?}", e);
553 return ApiError::InternalError.into_response();
554 }
555 if let Err(e) = tx.commit().await {
556 error!("Failed to commit transaction: {:?}", e);
557 return ApiError::InternalError.into_response();
558 }
559 let did_for_doc = session_row.did.clone();
560 let did_resolver = state.did_resolver.clone();
561 let (db_result, did_doc) = tokio::join!(
562 sqlx::query!(
563 r#"SELECT
564 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref,
565 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
566 discord_verified, telegram_verified, signal_verified
567 FROM users WHERE did = $1"#,
568 session_row.did
569 )
570 .fetch_optional(&state.db),
571 did_resolver.resolve_did_document(&did_for_doc)
572 );
573 match db_result {
574 Ok(Some(u)) => {
575 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
576 crate::comms::CommsChannel::Email => ("email", u.email_verified),
577 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
578 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
579 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
580 };
581 let pds_hostname =
582 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
583 let handle = full_handle(&u.handle, &pds_hostname);
584 let is_takendown = u.takedown_ref.is_some();
585 let is_active = u.deactivated_at.is_none() && !is_takendown;
586 let mut response = json!({
587 "accessJwt": new_access_meta.token,
588 "refreshJwt": new_refresh_meta.token,
589 "handle": handle,
590 "did": session_row.did,
591 "email": u.email,
592 "emailConfirmed": u.email_verified,
593 "preferredChannel": preferred_channel,
594 "preferredChannelVerified": preferred_channel_verified,
595 "preferredLocale": u.preferred_locale,
596 "isAdmin": u.is_admin,
597 "active": is_active
598 });
599 if let Some(doc) = did_doc {
600 response["didDoc"] = doc;
601 }
602 if is_takendown {
603 response["status"] = json!("takendown");
604 } else if u.deactivated_at.is_some() {
605 response["status"] = json!("deactivated");
606 }
607 Json(response)
608 .into_response()
609 }
610 Ok(None) => {
611 error!("User not found for existing session: {}", session_row.did);
612 ApiError::InternalError.into_response()
613 }
614 Err(e) => {
615 error!("Database error fetching user: {:?}", e);
616 ApiError::InternalError.into_response()
617 }
618 }
619}
620
621#[derive(Deserialize)]
622#[serde(rename_all = "camelCase")]
623pub struct ConfirmSignupInput {
624 pub did: String,
625 pub verification_code: String,
626}
627
628#[derive(Serialize)]
629#[serde(rename_all = "camelCase")]
630pub struct ConfirmSignupOutput {
631 pub access_jwt: String,
632 pub refresh_jwt: String,
633 pub handle: String,
634 pub did: String,
635 pub email: Option<String>,
636 pub email_verified: bool,
637 pub preferred_channel: String,
638 pub preferred_channel_verified: bool,
639}
640
641pub async fn confirm_signup(
642 State(state): State<AppState>,
643 Json(input): Json<ConfirmSignupInput>,
644) -> Response {
645 info!("confirm_signup called for DID: {}", input.did);
646 let row = match sqlx::query!(
647 r#"SELECT
648 u.id, u.did, u.handle, u.email,
649 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
650 u.discord_id, u.telegram_username, u.signal_number,
651 k.key_bytes, k.encryption_version
652 FROM users u
653 JOIN user_keys k ON u.id = k.user_id
654 WHERE u.did = $1"#,
655 input.did
656 )
657 .fetch_optional(&state.db)
658 .await
659 {
660 Ok(Some(row)) => row,
661 Ok(None) => {
662 warn!("User not found for confirm_signup: {}", input.did);
663 return ApiError::InvalidRequest("Invalid DID or verification code".into())
664 .into_response();
665 }
666 Err(e) => {
667 error!("Database error in confirm_signup: {:?}", e);
668 return ApiError::InternalError.into_response();
669 }
670 };
671
672 let (channel_str, identifier) = match row.channel {
673 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
674 crate::comms::CommsChannel::Discord => {
675 ("discord", row.discord_id.clone().unwrap_or_default())
676 }
677 crate::comms::CommsChannel::Telegram => (
678 "telegram",
679 row.telegram_username.clone().unwrap_or_default(),
680 ),
681 crate::comms::CommsChannel::Signal => {
682 ("signal", row.signal_number.clone().unwrap_or_default())
683 }
684 };
685
686 let normalized_token =
687 crate::auth::verification_token::normalize_token_input(&input.verification_code);
688 match crate::auth::verification_token::verify_signup_token(
689 &normalized_token,
690 channel_str,
691 &identifier,
692 ) {
693 Ok(token_data) => {
694 if token_data.did != input.did {
695 warn!(
696 "Token DID mismatch for confirm_signup: expected {}, got {}",
697 input.did, token_data.did
698 );
699 return ApiError::InvalidRequest("Invalid verification code".into())
700 .into_response();
701 }
702 }
703 Err(crate::auth::verification_token::VerifyError::Expired) => {
704 warn!("Verification code expired for user: {}", input.did);
705 return ApiError::ExpiredTokenMsg("Verification code has expired".into())
706 .into_response();
707 }
708 Err(e) => {
709 warn!("Invalid verification code for user {}: {:?}", input.did, e);
710 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
711 }
712 }
713
714 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
715 Ok(k) => k,
716 Err(e) => {
717 error!("Failed to decrypt user key: {:?}", e);
718 return ApiError::InternalError.into_response();
719 }
720 };
721 let verified_column = match row.channel {
722 crate::comms::CommsChannel::Email => "email_verified",
723 crate::comms::CommsChannel::Discord => "discord_verified",
724 crate::comms::CommsChannel::Telegram => "telegram_verified",
725 crate::comms::CommsChannel::Signal => "signal_verified",
726 };
727 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
728 if let Err(e) = sqlx::query(&update_query)
729 .bind(&input.did)
730 .execute(&state.db)
731 .await
732 {
733 error!("Failed to update verification status: {:?}", e);
734 return ApiError::InternalError.into_response();
735 }
736
737 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
738 Ok(m) => m,
739 Err(e) => {
740 error!("Failed to create access token: {:?}", e);
741 return ApiError::InternalError.into_response();
742 }
743 };
744 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
745 Ok(m) => m,
746 Err(e) => {
747 error!("Failed to create refresh token: {:?}", e);
748 return ApiError::InternalError.into_response();
749 }
750 };
751 let no_scope: Option<String> = None;
752 if let Err(e) = sqlx::query!(
753 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
754 row.did,
755 access_meta.jti,
756 refresh_meta.jti,
757 access_meta.expires_at,
758 refresh_meta.expires_at,
759 false,
760 false,
761 no_scope
762 )
763 .execute(&state.db)
764 .await
765 {
766 error!("Failed to insert session: {:?}", e);
767 return ApiError::InternalError.into_response();
768 }
769 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
770 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
771 warn!("Failed to enqueue welcome notification: {:?}", e);
772 }
773 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
774 let preferred_channel = match row.channel {
775 crate::comms::CommsChannel::Email => "email",
776 crate::comms::CommsChannel::Discord => "discord",
777 crate::comms::CommsChannel::Telegram => "telegram",
778 crate::comms::CommsChannel::Signal => "signal",
779 };
780 Json(ConfirmSignupOutput {
781 access_jwt: access_meta.token,
782 refresh_jwt: refresh_meta.token,
783 handle: row.handle,
784 did: row.did,
785 email: row.email,
786 email_verified,
787 preferred_channel: preferred_channel.to_string(),
788 preferred_channel_verified: true,
789 })
790 .into_response()
791}
792
793#[derive(Deserialize)]
794#[serde(rename_all = "camelCase")]
795pub struct ResendVerificationInput {
796 pub did: String,
797}
798
799pub async fn resend_verification(
800 State(state): State<AppState>,
801 Json(input): Json<ResendVerificationInput>,
802) -> Response {
803 info!("resend_verification called for DID: {}", input.did);
804 let row = match sqlx::query!(
805 r#"SELECT
806 id, handle, email,
807 preferred_comms_channel as "channel: crate::comms::CommsChannel",
808 discord_id, telegram_username, signal_number,
809 email_verified, discord_verified, telegram_verified, signal_verified
810 FROM users
811 WHERE did = $1"#,
812 input.did
813 )
814 .fetch_optional(&state.db)
815 .await
816 {
817 Ok(Some(row)) => row,
818 Ok(None) => {
819 return ApiError::InvalidRequest("User not found".into()).into_response();
820 }
821 Err(e) => {
822 error!("Database error in resend_verification: {:?}", e);
823 return ApiError::InternalError.into_response();
824 }
825 };
826 let is_verified =
827 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
828 if is_verified {
829 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
830 }
831
832 let (channel_str, recipient) = match row.channel {
833 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
834 crate::comms::CommsChannel::Discord => {
835 ("discord", row.discord_id.clone().unwrap_or_default())
836 }
837 crate::comms::CommsChannel::Telegram => (
838 "telegram",
839 row.telegram_username.clone().unwrap_or_default(),
840 ),
841 crate::comms::CommsChannel::Signal => {
842 ("signal", row.signal_number.clone().unwrap_or_default())
843 }
844 };
845
846 let verification_token =
847 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
848 let formatted_token =
849 crate::auth::verification_token::format_token_for_display(&verification_token);
850
851 if let Err(e) = crate::comms::enqueue_signup_verification(
852 &state.db,
853 row.id,
854 channel_str,
855 &recipient,
856 &formatted_token,
857 None,
858 )
859 .await
860 {
861 warn!("Failed to enqueue verification notification: {:?}", e);
862 }
863 Json(json!({"success": true})).into_response()
864}
865
866#[derive(Serialize)]
867#[serde(rename_all = "camelCase")]
868pub struct SessionInfo {
869 pub id: String,
870 pub session_type: String,
871 pub client_name: Option<String>,
872 pub created_at: String,
873 pub expires_at: String,
874 pub is_current: bool,
875}
876
877#[derive(Serialize)]
878#[serde(rename_all = "camelCase")]
879pub struct ListSessionsOutput {
880 pub sessions: Vec<SessionInfo>,
881}
882
883pub async fn list_sessions(
884 State(state): State<AppState>,
885 headers: HeaderMap,
886 auth: BearerAuth,
887) -> Response {
888 let current_jti = headers
889 .get("authorization")
890 .and_then(|v| v.to_str().ok())
891 .and_then(|v| v.strip_prefix("Bearer "))
892 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
893
894 let mut sessions: Vec<SessionInfo> = Vec::new();
895
896 let jwt_result = sqlx::query_as::<
897 _,
898 (
899 i32,
900 String,
901 chrono::DateTime<chrono::Utc>,
902 chrono::DateTime<chrono::Utc>,
903 ),
904 >(
905 r#"
906 SELECT id, access_jti, created_at, refresh_expires_at
907 FROM session_tokens
908 WHERE did = $1 AND refresh_expires_at > NOW()
909 ORDER BY created_at DESC
910 "#,
911 )
912 .bind(&auth.0.did)
913 .fetch_all(&state.db)
914 .await;
915
916 match jwt_result {
917 Ok(rows) => {
918 for (id, access_jti, created_at, expires_at) in rows {
919 sessions.push(SessionInfo {
920 id: format!("jwt:{}", id),
921 session_type: "legacy".to_string(),
922 client_name: None,
923 created_at: created_at.to_rfc3339(),
924 expires_at: expires_at.to_rfc3339(),
925 is_current: current_jti.as_ref() == Some(&access_jti),
926 });
927 }
928 }
929 Err(e) => {
930 error!("DB error fetching JWT sessions: {:?}", e);
931 return (
932 StatusCode::INTERNAL_SERVER_ERROR,
933 Json(json!({"error": "InternalError"})),
934 )
935 .into_response();
936 }
937 }
938
939 let oauth_result = sqlx::query_as::<
940 _,
941 (
942 i32,
943 String,
944 chrono::DateTime<chrono::Utc>,
945 chrono::DateTime<chrono::Utc>,
946 String,
947 ),
948 >(
949 r#"
950 SELECT id, token_id, created_at, expires_at, client_id
951 FROM oauth_token
952 WHERE did = $1 AND expires_at > NOW()
953 ORDER BY created_at DESC
954 "#,
955 )
956 .bind(&auth.0.did)
957 .fetch_all(&state.db)
958 .await;
959
960 match oauth_result {
961 Ok(rows) => {
962 for (id, token_id, created_at, expires_at, client_id) in rows {
963 let client_name = extract_client_name(&client_id);
964 let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id);
965 sessions.push(SessionInfo {
966 id: format!("oauth:{}", id),
967 session_type: "oauth".to_string(),
968 client_name: Some(client_name),
969 created_at: created_at.to_rfc3339(),
970 expires_at: expires_at.to_rfc3339(),
971 is_current: is_current_oauth,
972 });
973 }
974 }
975 Err(e) => {
976 error!("DB error fetching OAuth sessions: {:?}", e);
977 return (
978 StatusCode::INTERNAL_SERVER_ERROR,
979 Json(json!({"error": "InternalError"})),
980 )
981 .into_response();
982 }
983 }
984
985 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
986
987 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
988}
989
990fn extract_client_name(client_id: &str) -> String {
991 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
992 "Localhost App".to_string()
993 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
994 parsed.host_str().unwrap_or("Unknown App").to_string()
995 } else {
996 client_id.to_string()
997 }
998}
999
1000#[derive(Deserialize)]
1001#[serde(rename_all = "camelCase")]
1002pub struct RevokeSessionInput {
1003 pub session_id: String,
1004}
1005
1006pub async fn revoke_session(
1007 State(state): State<AppState>,
1008 auth: BearerAuth,
1009 Json(input): Json<RevokeSessionInput>,
1010) -> Response {
1011 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
1012 let session_id: i32 = match jwt_id.parse() {
1013 Ok(id) => id,
1014 Err(_) => {
1015 return (
1016 StatusCode::BAD_REQUEST,
1017 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
1018 )
1019 .into_response();
1020 }
1021 };
1022 let session = sqlx::query_as::<_, (String,)>(
1023 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
1024 )
1025 .bind(session_id)
1026 .bind(&auth.0.did)
1027 .fetch_optional(&state.db)
1028 .await;
1029 let access_jti = match session {
1030 Ok(Some((jti,))) => jti,
1031 Ok(None) => {
1032 return (
1033 StatusCode::NOT_FOUND,
1034 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1035 )
1036 .into_response();
1037 }
1038 Err(e) => {
1039 error!("DB error in revoke_session: {:?}", e);
1040 return (
1041 StatusCode::INTERNAL_SERVER_ERROR,
1042 Json(json!({"error": "InternalError"})),
1043 )
1044 .into_response();
1045 }
1046 };
1047 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
1048 .bind(session_id)
1049 .execute(&state.db)
1050 .await
1051 {
1052 error!("DB error deleting session: {:?}", e);
1053 return (
1054 StatusCode::INTERNAL_SERVER_ERROR,
1055 Json(json!({"error": "InternalError"})),
1056 )
1057 .into_response();
1058 }
1059 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
1060 if let Err(e) = state.cache.delete(&cache_key).await {
1061 warn!("Failed to invalidate session cache: {:?}", e);
1062 }
1063 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
1064 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
1065 let session_id: i32 = match oauth_id.parse() {
1066 Ok(id) => id,
1067 Err(_) => {
1068 return (
1069 StatusCode::BAD_REQUEST,
1070 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
1071 )
1072 .into_response();
1073 }
1074 };
1075 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
1076 .bind(session_id)
1077 .bind(&auth.0.did)
1078 .execute(&state.db)
1079 .await;
1080 match result {
1081 Ok(r) if r.rows_affected() == 0 => {
1082 return (
1083 StatusCode::NOT_FOUND,
1084 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1085 )
1086 .into_response();
1087 }
1088 Err(e) => {
1089 error!("DB error deleting OAuth session: {:?}", e);
1090 return (
1091 StatusCode::INTERNAL_SERVER_ERROR,
1092 Json(json!({"error": "InternalError"})),
1093 )
1094 .into_response();
1095 }
1096 _ => {}
1097 }
1098 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1099 } else {
1100 return (
1101 StatusCode::BAD_REQUEST,
1102 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1103 )
1104 .into_response();
1105 }
1106 (StatusCode::OK, Json(json!({}))).into_response()
1107}
1108
1109pub async fn revoke_all_sessions(
1110 State(state): State<AppState>,
1111 headers: HeaderMap,
1112 auth: BearerAuth,
1113) -> Response {
1114 let current_jti = headers
1115 .get("authorization")
1116 .and_then(|v| v.to_str().ok())
1117 .and_then(|v| v.strip_prefix("Bearer "))
1118 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1119
1120 if let Some(ref jti) = current_jti {
1121 if auth.0.is_oauth {
1122 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1123 .bind(&auth.0.did)
1124 .execute(&state.db)
1125 .await
1126 {
1127 error!("DB error revoking JWT sessions: {:?}", e);
1128 return (
1129 StatusCode::INTERNAL_SERVER_ERROR,
1130 Json(json!({"error": "InternalError"})),
1131 )
1132 .into_response();
1133 }
1134 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1135 .bind(&auth.0.did)
1136 .bind(jti)
1137 .execute(&state.db)
1138 .await
1139 {
1140 error!("DB error revoking OAuth sessions: {:?}", e);
1141 return (
1142 StatusCode::INTERNAL_SERVER_ERROR,
1143 Json(json!({"error": "InternalError"})),
1144 )
1145 .into_response();
1146 }
1147 } else {
1148 if let Err(e) =
1149 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1150 .bind(&auth.0.did)
1151 .bind(jti)
1152 .execute(&state.db)
1153 .await
1154 {
1155 error!("DB error revoking JWT sessions: {:?}", e);
1156 return (
1157 StatusCode::INTERNAL_SERVER_ERROR,
1158 Json(json!({"error": "InternalError"})),
1159 )
1160 .into_response();
1161 }
1162 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1163 .bind(&auth.0.did)
1164 .execute(&state.db)
1165 .await
1166 {
1167 error!("DB error revoking OAuth sessions: {:?}", e);
1168 return (
1169 StatusCode::INTERNAL_SERVER_ERROR,
1170 Json(json!({"error": "InternalError"})),
1171 )
1172 .into_response();
1173 }
1174 }
1175 } else {
1176 return (
1177 StatusCode::BAD_REQUEST,
1178 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1179 )
1180 .into_response();
1181 }
1182
1183 info!(did = %auth.0.did, "All other sessions revoked");
1184 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1185}
1186
1187#[derive(Serialize)]
1188#[serde(rename_all = "camelCase")]
1189pub struct LegacyLoginPreferenceOutput {
1190 pub allow_legacy_login: bool,
1191 pub has_mfa: bool,
1192}
1193
1194pub async fn get_legacy_login_preference(
1195 State(state): State<AppState>,
1196 auth: BearerAuth,
1197) -> Response {
1198 let result = sqlx::query!(
1199 r#"SELECT
1200 u.allow_legacy_login,
1201 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1202 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1203 FROM users u WHERE u.did = $1"#,
1204 auth.0.did
1205 )
1206 .fetch_optional(&state.db)
1207 .await;
1208
1209 match result {
1210 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1211 allow_legacy_login: row.allow_legacy_login,
1212 has_mfa: row.has_mfa,
1213 })
1214 .into_response(),
1215 Ok(None) => (
1216 StatusCode::NOT_FOUND,
1217 Json(json!({"error": "AccountNotFound"})),
1218 )
1219 .into_response(),
1220 Err(e) => {
1221 error!("DB error: {:?}", e);
1222 (
1223 StatusCode::INTERNAL_SERVER_ERROR,
1224 Json(json!({"error": "InternalError"})),
1225 )
1226 .into_response()
1227 }
1228 }
1229}
1230
1231#[derive(Deserialize)]
1232#[serde(rename_all = "camelCase")]
1233pub struct UpdateLegacyLoginInput {
1234 pub allow_legacy_login: bool,
1235}
1236
1237pub async fn update_legacy_login_preference(
1238 State(state): State<AppState>,
1239 auth: BearerAuth,
1240 Json(input): Json<UpdateLegacyLoginInput>,
1241) -> Response {
1242 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1243 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1244 .await;
1245 }
1246
1247 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1248 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1249 }
1250
1251 let result = sqlx::query!(
1252 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1253 input.allow_legacy_login,
1254 auth.0.did
1255 )
1256 .fetch_optional(&state.db)
1257 .await;
1258
1259 match result {
1260 Ok(Some(_)) => {
1261 info!(
1262 did = %auth.0.did,
1263 allow_legacy_login = input.allow_legacy_login,
1264 "Legacy login preference updated"
1265 );
1266 Json(json!({
1267 "allowLegacyLogin": input.allow_legacy_login
1268 }))
1269 .into_response()
1270 }
1271 Ok(None) => (
1272 StatusCode::NOT_FOUND,
1273 Json(json!({"error": "AccountNotFound"})),
1274 )
1275 .into_response(),
1276 Err(e) => {
1277 error!("DB error: {:?}", e);
1278 (
1279 StatusCode::INTERNAL_SERVER_ERROR,
1280 Json(json!({"error": "InternalError"})),
1281 )
1282 .into_response()
1283 }
1284 }
1285}
1286
1287use crate::comms::locale::VALID_LOCALES;
1288
1289#[derive(Deserialize)]
1290#[serde(rename_all = "camelCase")]
1291pub struct UpdateLocaleInput {
1292 pub preferred_locale: String,
1293}
1294
1295pub async fn update_locale(
1296 State(state): State<AppState>,
1297 auth: BearerAuth,
1298 Json(input): Json<UpdateLocaleInput>,
1299) -> Response {
1300 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1301 return (
1302 StatusCode::BAD_REQUEST,
1303 Json(json!({
1304 "error": "InvalidRequest",
1305 "message": format!("Invalid locale. Valid options: {}", VALID_LOCALES.join(", "))
1306 })),
1307 )
1308 .into_response();
1309 }
1310
1311 let result = sqlx::query!(
1312 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1313 input.preferred_locale,
1314 auth.0.did
1315 )
1316 .fetch_optional(&state.db)
1317 .await;
1318
1319 match result {
1320 Ok(Some(_)) => {
1321 info!(
1322 did = %auth.0.did,
1323 locale = %input.preferred_locale,
1324 "User locale preference updated"
1325 );
1326 Json(json!({
1327 "preferredLocale": input.preferred_locale
1328 }))
1329 .into_response()
1330 }
1331 Ok(None) => (
1332 StatusCode::NOT_FOUND,
1333 Json(json!({"error": "AccountNotFound"})),
1334 )
1335 .into_response(),
1336 Err(e) => {
1337 error!("DB error updating locale: {:?}", e);
1338 (
1339 StatusCode::INTERNAL_SERVER_ERROR,
1340 Json(json!({"error": "InternalError"})),
1341 )
1342 .into_response()
1343 }
1344 }
1345}