this repo has no description
1use crate::api::error::ApiError;
2use crate::api::{EmptyResponse, SuccessResponse};
3use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
4use crate::state::{AppState, RateLimitKind};
5use crate::types::{AccountState, Did, Handle, PlainPassword};
6use axum::{
7 Json,
8 extract::State,
9 http::{HeaderMap, StatusCode},
10 response::{IntoResponse, Response},
11};
12use bcrypt::verify;
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use tracing::{error, info, warn};
16
17fn extract_client_ip(headers: &HeaderMap) -> String {
18 if let Some(forwarded) = headers.get("x-forwarded-for")
19 && let Ok(value) = forwarded.to_str()
20 && let Some(first_ip) = value.split(',').next()
21 {
22 return first_ip.trim().to_string();
23 }
24 if let Some(real_ip) = headers.get("x-real-ip")
25 && let Ok(value) = real_ip.to_str()
26 {
27 return value.trim().to_string();
28 }
29 "unknown".to_string()
30}
31
32fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
33 let identifier = identifier.trim();
34 if identifier.contains('@') || identifier.starts_with("did:") {
35 identifier.to_string()
36 } else if !identifier.contains('.') {
37 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
38 } else {
39 identifier.to_lowercase()
40 }
41}
42
43fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
44 stored_handle.to_string()
45}
46
47#[derive(Deserialize)]
48#[serde(rename_all = "camelCase")]
49pub struct CreateSessionInput {
50 pub identifier: String,
51 pub password: PlainPassword,
52 #[serde(default)]
53 pub allow_takendown: bool,
54}
55
56#[derive(Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct CreateSessionOutput {
59 pub access_jwt: String,
60 pub refresh_jwt: String,
61 pub handle: Handle,
62 pub did: Did,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub did_doc: Option<serde_json::Value>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub email: Option<String>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub email_confirmed: Option<bool>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub active: Option<bool>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub status: Option<String>,
73}
74
75pub async fn create_session(
76 State(state): State<AppState>,
77 headers: HeaderMap,
78 Json(input): Json<CreateSessionInput>,
79) -> Response {
80 info!(
81 "create_session called with identifier: {}",
82 input.identifier
83 );
84 let client_ip = extract_client_ip(&headers);
85 if !state
86 .check_rate_limit(RateLimitKind::Login, &client_ip)
87 .await
88 {
89 warn!(ip = %client_ip, "Login rate limit exceeded");
90 return ApiError::RateLimitExceeded(None).into_response();
91 }
92 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
93 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
94 info!(
95 "Normalized identifier: {} -> {}",
96 input.identifier, normalized_identifier
97 );
98 let row = match sqlx::query!(
99 r#"SELECT
100 u.id, u.did, u.handle, u.password_hash, u.email, u.deactivated_at, u.takedown_ref,
101 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
102 u.allow_legacy_login, u.migrated_to_pds,
103 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
104 k.key_bytes, k.encryption_version,
105 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
106 FROM users u
107 JOIN user_keys k ON u.id = k.user_id
108 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
109 normalized_identifier
110 )
111 .fetch_optional(&state.db)
112 .await
113 {
114 Ok(Some(row)) => row,
115 Ok(None) => {
116 let _ = verify(
117 &input.password,
118 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
119 );
120 warn!("User not found for login attempt");
121 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into()))
122 .into_response();
123 }
124 Err(e) => {
125 error!("Database error fetching user: {:?}", e);
126 return ApiError::InternalError(None).into_response();
127 }
128 };
129 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
130 Ok(k) => k,
131 Err(e) => {
132 error!("Failed to decrypt user key: {:?}", e);
133 return ApiError::InternalError(None).into_response();
134 }
135 };
136 let (password_valid, app_password_name, app_password_scopes, app_password_controller) = if row
137 .password_hash
138 .as_ref()
139 .map(|h| verify(&input.password, h).unwrap_or(false))
140 .unwrap_or(false)
141 {
142 (true, None, None, None)
143 } else {
144 let app_passwords = sqlx::query!(
145 "SELECT name, password_hash, scopes, created_by_controller_did FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
146 row.id
147 )
148 .fetch_all(&state.db)
149 .await
150 .unwrap_or_default();
151 let matched = app_passwords
152 .iter()
153 .find(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
154 match matched {
155 Some(app) => (
156 true,
157 Some(app.name.clone()),
158 app.scopes.clone(),
159 app.created_by_controller_did.clone(),
160 ),
161 None => (false, None, None, None),
162 }
163 };
164 if !password_valid {
165 warn!("Password verification failed for login attempt");
166 return ApiError::AuthenticationFailed(Some("Invalid identifier or password".into()))
167 .into_response();
168 }
169 let account_state = AccountState::from_db_fields(
170 row.deactivated_at,
171 row.takedown_ref.clone(),
172 row.migrated_to_pds.clone(),
173 None,
174 );
175 if account_state.is_takendown() && !input.allow_takendown {
176 warn!("Login attempt for takendown account: {}", row.did);
177 return ApiError::AccountTakedown.into_response();
178 }
179 let is_verified =
180 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
181 let is_delegated = crate::delegation::is_delegated_account(&state.db, &row.did)
182 .await
183 .unwrap_or(false);
184 if !is_verified && !is_delegated {
185 warn!("Login attempt for unverified account: {}", row.did);
186 return (
187 StatusCode::FORBIDDEN,
188 Json(json!({
189 "error": "AccountNotVerified",
190 "message": "Please verify your account before logging in",
191 "did": row.did
192 })),
193 )
194 .into_response();
195 }
196 let has_totp = row.totp_enabled.unwrap_or(false);
197 let is_legacy_login = has_totp;
198 if has_totp && !row.allow_legacy_login {
199 warn!("Legacy login blocked for TOTP-enabled account: {}", row.did);
200 return (
201 StatusCode::FORBIDDEN,
202 Json(json!({
203 "error": "MfaRequired",
204 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
205 "did": row.did
206 })),
207 )
208 .into_response();
209 }
210 let access_meta = match crate::auth::create_access_token_with_delegation(
211 &row.did,
212 &key_bytes,
213 app_password_scopes.as_deref(),
214 app_password_controller.as_deref(),
215 None,
216 ) {
217 Ok(m) => m,
218 Err(e) => {
219 error!("Failed to create access token: {:?}", e);
220 return ApiError::InternalError(None).into_response();
221 }
222 };
223 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
224 Ok(m) => m,
225 Err(e) => {
226 error!("Failed to create refresh token: {:?}", e);
227 return ApiError::InternalError(None).into_response();
228 }
229 };
230 let did_for_doc = row.did.clone();
231 let did_resolver = state.did_resolver.clone();
232 let (insert_result, did_doc) = tokio::join!(
233 sqlx::query!(
234 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope, controller_did, app_password_name) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
235 row.did,
236 access_meta.jti,
237 refresh_meta.jti,
238 access_meta.expires_at,
239 refresh_meta.expires_at,
240 is_legacy_login,
241 false,
242 app_password_scopes,
243 app_password_controller,
244 app_password_name
245 )
246 .execute(&state.db),
247 did_resolver.resolve_did_document(&did_for_doc)
248 );
249 if let Err(e) = insert_result {
250 error!("Failed to insert session: {:?}", e);
251 return ApiError::InternalError(None).into_response();
252 }
253 if is_legacy_login {
254 warn!(
255 did = %row.did,
256 ip = %client_ip,
257 "Legacy login on TOTP-enabled account - sending notification"
258 );
259 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
260 if let Err(e) = crate::comms::queue_legacy_login_notification(
261 &state.db,
262 row.id,
263 &hostname,
264 &client_ip,
265 row.preferred_comms_channel,
266 )
267 .await
268 {
269 error!("Failed to queue legacy login notification: {:?}", e);
270 }
271 }
272 let handle = full_handle(&row.handle, &pds_hostname);
273 let is_active = account_state.is_active();
274 let status = account_state.status_for_session().map(String::from);
275 Json(CreateSessionOutput {
276 access_jwt: access_meta.token,
277 refresh_jwt: refresh_meta.token,
278 handle: handle.into(),
279 did: row.did.into(),
280 did_doc,
281 email: row.email,
282 email_confirmed: Some(row.email_verified),
283 active: Some(is_active),
284 status,
285 })
286 .into_response()
287}
288
289pub async fn get_session(
290 State(state): State<AppState>,
291 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
292) -> Response {
293 let permissions = auth_user.permissions();
294 let can_read_email = permissions.allows_email_read();
295
296 let did_for_doc = auth_user.did.clone();
297 let did_resolver = state.did_resolver.clone();
298 let (db_result, did_doc) = tokio::join!(
299 sqlx::query!(
300 r#"SELECT
301 handle, email, email_verified, is_admin, deactivated_at, takedown_ref, preferred_locale,
302 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
303 discord_verified, telegram_verified, signal_verified, migrated_to_pds, migrated_at
304 FROM users WHERE did = $1"#,
305 &auth_user.did
306 )
307 .fetch_optional(&state.db),
308 did_resolver.resolve_did_document(&did_for_doc)
309 );
310 match db_result {
311 Ok(Some(row)) => {
312 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
313 crate::comms::CommsChannel::Email => ("email", row.email_verified),
314 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
315 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
316 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
317 };
318 let pds_hostname =
319 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
320 let handle = full_handle(&row.handle, &pds_hostname);
321 let account_state = AccountState::from_db_fields(
322 row.deactivated_at,
323 row.takedown_ref.clone(),
324 row.migrated_to_pds.clone(),
325 row.migrated_at,
326 );
327 let email_value = if can_read_email {
328 row.email.clone()
329 } else {
330 None
331 };
332 let email_confirmed_value = can_read_email && row.email_verified;
333 let mut response = json!({
334 "handle": handle,
335 "did": &auth_user.did,
336 "active": account_state.is_active(),
337 "preferredChannel": preferred_channel,
338 "preferredChannelVerified": preferred_channel_verified,
339 "preferredLocale": row.preferred_locale,
340 "isAdmin": row.is_admin
341 });
342 if can_read_email {
343 response["email"] = json!(email_value);
344 response["emailConfirmed"] = json!(email_confirmed_value);
345 }
346 if let Some(status) = account_state.status_for_session() {
347 response["status"] = json!(status);
348 }
349 if let AccountState::Migrated { to_pds, at } = &account_state {
350 response["migratedToPds"] = json!(to_pds);
351 response["migratedAt"] = json!(at);
352 }
353 if let Some(doc) = did_doc {
354 response["didDoc"] = doc;
355 }
356 Json(response).into_response()
357 }
358 Ok(None) => ApiError::AuthenticationFailed(None).into_response(),
359 Err(e) => {
360 error!("Database error in get_session: {:?}", e);
361 ApiError::InternalError(None).into_response()
362 }
363 }
364}
365
366pub async fn delete_session(
367 State(state): State<AppState>,
368 headers: axum::http::HeaderMap,
369 _auth: BearerAuth,
370) -> Response {
371 let extracted = match crate::auth::extract_auth_token_from_header(
372 headers.get("Authorization").and_then(|h| h.to_str().ok()),
373 ) {
374 Some(t) => t,
375 None => return ApiError::AuthenticationRequired.into_response(),
376 };
377 let jti = match crate::auth::get_jti_from_token(&extracted.token) {
378 Ok(jti) => jti,
379 Err(_) => return ApiError::AuthenticationFailed(None).into_response(),
380 };
381 let did = crate::auth::get_did_from_token(&extracted.token).ok();
382 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
383 .execute(&state.db)
384 .await
385 {
386 Ok(res) if res.rows_affected() > 0 => {
387 if let Some(did) = did {
388 let session_cache_key = format!("auth:session:{}:{}", did, jti);
389 let _ = state.cache.delete(&session_cache_key).await;
390 }
391 EmptyResponse::ok().into_response()
392 }
393 Ok(_) => ApiError::AuthenticationFailed(None).into_response(),
394 Err(e) => {
395 error!("Database error in delete_session: {:?}", e);
396 ApiError::AuthenticationFailed(None).into_response()
397 }
398 }
399}
400
401pub async fn refresh_session(
402 State(state): State<AppState>,
403 headers: axum::http::HeaderMap,
404) -> Response {
405 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
406 if !state
407 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
408 .await
409 {
410 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
411 return ApiError::RateLimitExceeded(None).into_response();
412 }
413 let extracted = match crate::auth::extract_auth_token_from_header(
414 headers.get("Authorization").and_then(|h| h.to_str().ok()),
415 ) {
416 Some(t) => t,
417 None => return ApiError::AuthenticationRequired.into_response(),
418 };
419 let refresh_token = extracted.token;
420 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
421 Ok(jti) => jti,
422 Err(_) => {
423 return ApiError::AuthenticationFailed(Some("Invalid token format".into()))
424 .into_response();
425 }
426 };
427 let mut tx = match state.db.begin().await {
428 Ok(tx) => tx,
429 Err(e) => {
430 error!("Failed to begin transaction: {:?}", e);
431 return ApiError::InternalError(None).into_response();
432 }
433 };
434 if let Ok(Some(session_id)) = sqlx::query_scalar!(
435 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
436 refresh_jti
437 )
438 .fetch_optional(&mut *tx)
439 .await
440 {
441 warn!(
442 "Refresh token reuse detected! Revoking token family for session_id: {}",
443 session_id
444 );
445 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
446 .execute(&mut *tx)
447 .await;
448 let _ = tx.commit().await;
449 return ApiError::AuthenticationFailed(Some(
450 "Refresh token has been revoked due to suspected compromise".into(),
451 ))
452 .into_response();
453 }
454 let session_row = match sqlx::query!(
455 r#"SELECT st.id, st.did, st.scope, st.controller_did, k.key_bytes, k.encryption_version
456 FROM session_tokens st
457 JOIN users u ON st.did = u.did
458 JOIN user_keys k ON u.id = k.user_id
459 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
460 FOR UPDATE OF st"#,
461 refresh_jti
462 )
463 .fetch_optional(&mut *tx)
464 .await
465 {
466 Ok(Some(row)) => row,
467 Ok(None) => {
468 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into()))
469 .into_response();
470 }
471 Err(e) => {
472 error!("Database error fetching session: {:?}", e);
473 return ApiError::InternalError(None).into_response();
474 }
475 };
476 let key_bytes =
477 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
478 Ok(k) => k,
479 Err(e) => {
480 error!("Failed to decrypt user key: {:?}", e);
481 return ApiError::InternalError(None).into_response();
482 }
483 };
484 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
485 return ApiError::AuthenticationFailed(Some("Invalid refresh token".into()))
486 .into_response();
487 }
488 let new_access_meta = match crate::auth::create_access_token_with_delegation(
489 &session_row.did,
490 &key_bytes,
491 session_row.scope.as_deref(),
492 session_row.controller_did.as_deref(),
493 None,
494 ) {
495 Ok(m) => m,
496 Err(e) => {
497 error!("Failed to create access token: {:?}", e);
498 return ApiError::InternalError(None).into_response();
499 }
500 };
501 let new_refresh_meta =
502 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
503 Ok(m) => m,
504 Err(e) => {
505 error!("Failed to create refresh token: {:?}", e);
506 return ApiError::InternalError(None).into_response();
507 }
508 };
509 match sqlx::query!(
510 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
511 refresh_jti,
512 session_row.id
513 )
514 .execute(&mut *tx)
515 .await
516 {
517 Ok(result) if result.rows_affected() == 0 => {
518 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
519 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
520 .execute(&mut *tx)
521 .await;
522 let _ = tx.commit().await;
523 return ApiError::AuthenticationFailed(Some("Refresh token has been revoked due to suspected compromise".into())).into_response();
524 }
525 Err(e) => {
526 error!("Failed to record used refresh token: {:?}", e);
527 return ApiError::InternalError(None).into_response();
528 }
529 Ok(_) => {}
530 }
531 if let Err(e) = sqlx::query!(
532 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
533 new_access_meta.jti,
534 new_refresh_meta.jti,
535 new_access_meta.expires_at,
536 new_refresh_meta.expires_at,
537 session_row.id
538 )
539 .execute(&mut *tx)
540 .await
541 {
542 error!("Database error updating session: {:?}", e);
543 return ApiError::InternalError(None).into_response();
544 }
545 if let Err(e) = tx.commit().await {
546 error!("Failed to commit transaction: {:?}", e);
547 return ApiError::InternalError(None).into_response();
548 }
549 let did_for_doc = session_row.did.clone();
550 let did_resolver = state.did_resolver.clone();
551 let (db_result, did_doc) = tokio::join!(
552 sqlx::query!(
553 r#"SELECT
554 handle, email, email_verified, is_admin, preferred_locale, deactivated_at, takedown_ref,
555 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
556 discord_verified, telegram_verified, signal_verified
557 FROM users WHERE did = $1"#,
558 session_row.did
559 )
560 .fetch_optional(&state.db),
561 did_resolver.resolve_did_document(&did_for_doc)
562 );
563 match db_result {
564 Ok(Some(u)) => {
565 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
566 crate::comms::CommsChannel::Email => ("email", u.email_verified),
567 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
568 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
569 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
570 };
571 let pds_hostname =
572 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
573 let handle = full_handle(&u.handle, &pds_hostname);
574 let account_state =
575 AccountState::from_db_fields(u.deactivated_at, u.takedown_ref.clone(), None, None);
576 let mut response = json!({
577 "accessJwt": new_access_meta.token,
578 "refreshJwt": new_refresh_meta.token,
579 "handle": handle,
580 "did": session_row.did,
581 "email": u.email,
582 "emailConfirmed": u.email_verified,
583 "preferredChannel": preferred_channel,
584 "preferredChannelVerified": preferred_channel_verified,
585 "preferredLocale": u.preferred_locale,
586 "isAdmin": u.is_admin,
587 "active": account_state.is_active()
588 });
589 if let Some(doc) = did_doc {
590 response["didDoc"] = doc;
591 }
592 if let Some(status) = account_state.status_for_session() {
593 response["status"] = json!(status);
594 }
595 Json(response).into_response()
596 }
597 Ok(None) => {
598 error!("User not found for existing session: {}", session_row.did);
599 ApiError::InternalError(None).into_response()
600 }
601 Err(e) => {
602 error!("Database error fetching user: {:?}", e);
603 ApiError::InternalError(None).into_response()
604 }
605 }
606}
607
608#[derive(Deserialize)]
609#[serde(rename_all = "camelCase")]
610pub struct ConfirmSignupInput {
611 pub did: Did,
612 pub verification_code: String,
613}
614
615#[derive(Serialize)]
616#[serde(rename_all = "camelCase")]
617pub struct ConfirmSignupOutput {
618 pub access_jwt: String,
619 pub refresh_jwt: String,
620 pub handle: Handle,
621 pub did: Did,
622 pub email: Option<String>,
623 pub email_verified: bool,
624 pub preferred_channel: String,
625 pub preferred_channel_verified: bool,
626}
627
628pub async fn confirm_signup(
629 State(state): State<AppState>,
630 Json(input): Json<ConfirmSignupInput>,
631) -> Response {
632 info!("confirm_signup called for DID: {}", input.did);
633 let row = match sqlx::query!(
634 r#"SELECT
635 u.id, u.did, u.handle, u.email,
636 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
637 u.discord_id, u.telegram_username, u.signal_number,
638 k.key_bytes, k.encryption_version
639 FROM users u
640 JOIN user_keys k ON u.id = k.user_id
641 WHERE u.did = $1"#,
642 input.did.as_str()
643 )
644 .fetch_optional(&state.db)
645 .await
646 {
647 Ok(Some(row)) => row,
648 Ok(None) => {
649 warn!("User not found for confirm_signup: {}", input.did);
650 return ApiError::InvalidRequest("Invalid DID or verification code".into())
651 .into_response();
652 }
653 Err(e) => {
654 error!("Database error in confirm_signup: {:?}", e);
655 return ApiError::InternalError(None).into_response();
656 }
657 };
658
659 let (channel_str, identifier) = match row.channel {
660 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
661 crate::comms::CommsChannel::Discord => {
662 ("discord", row.discord_id.clone().unwrap_or_default())
663 }
664 crate::comms::CommsChannel::Telegram => (
665 "telegram",
666 row.telegram_username.clone().unwrap_or_default(),
667 ),
668 crate::comms::CommsChannel::Signal => {
669 ("signal", row.signal_number.clone().unwrap_or_default())
670 }
671 };
672
673 let normalized_token =
674 crate::auth::verification_token::normalize_token_input(&input.verification_code);
675 match crate::auth::verification_token::verify_signup_token(
676 &normalized_token,
677 channel_str,
678 &identifier,
679 ) {
680 Ok(token_data) => {
681 if token_data.did != input.did.as_str() {
682 warn!(
683 "Token DID mismatch for confirm_signup: expected {}, got {}",
684 input.did, token_data.did
685 );
686 return ApiError::InvalidRequest("Invalid verification code".into())
687 .into_response();
688 }
689 }
690 Err(crate::auth::verification_token::VerifyError::Expired) => {
691 warn!("Verification code expired for user: {}", input.did);
692 return ApiError::ExpiredToken(Some("Verification code has expired".into()))
693 .into_response();
694 }
695 Err(e) => {
696 warn!("Invalid verification code for user {}: {:?}", input.did, e);
697 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
698 }
699 }
700
701 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
702 Ok(k) => k,
703 Err(e) => {
704 error!("Failed to decrypt user key: {:?}", e);
705 return ApiError::InternalError(None).into_response();
706 }
707 };
708
709 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
710 Ok(m) => m,
711 Err(e) => {
712 error!("Failed to create access token: {:?}", e);
713 return ApiError::InternalError(None).into_response();
714 }
715 };
716 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
717 Ok(m) => m,
718 Err(e) => {
719 error!("Failed to create refresh token: {:?}", e);
720 return ApiError::InternalError(None).into_response();
721 }
722 };
723
724 let mut tx = match state.db.begin().await {
725 Ok(tx) => tx,
726 Err(e) => {
727 error!("Failed to begin transaction: {:?}", e);
728 return ApiError::InternalError(None).into_response();
729 }
730 };
731
732 let verified_column = match row.channel {
733 crate::comms::CommsChannel::Email => "email_verified",
734 crate::comms::CommsChannel::Discord => "discord_verified",
735 crate::comms::CommsChannel::Telegram => "telegram_verified",
736 crate::comms::CommsChannel::Signal => "signal_verified",
737 };
738 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
739 if let Err(e) = sqlx::query(&update_query)
740 .bind(input.did.as_str())
741 .execute(&mut *tx)
742 .await
743 {
744 error!("Failed to update verification status: {:?}", e);
745 return ApiError::InternalError(None).into_response();
746 }
747
748 let no_scope: Option<String> = None;
749 if let Err(e) = sqlx::query!(
750 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
751 row.did,
752 access_meta.jti,
753 refresh_meta.jti,
754 access_meta.expires_at,
755 refresh_meta.expires_at,
756 false,
757 false,
758 no_scope
759 )
760 .execute(&mut *tx)
761 .await
762 {
763 error!("Failed to insert session: {:?}", e);
764 return ApiError::InternalError(None).into_response();
765 }
766
767 if let Err(e) = tx.commit().await {
768 error!("Failed to commit transaction: {:?}", e);
769 return ApiError::InternalError(None).into_response();
770 }
771
772 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
773 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
774 warn!("Failed to enqueue welcome notification: {:?}", e);
775 }
776 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
777 let preferred_channel = match row.channel {
778 crate::comms::CommsChannel::Email => "email",
779 crate::comms::CommsChannel::Discord => "discord",
780 crate::comms::CommsChannel::Telegram => "telegram",
781 crate::comms::CommsChannel::Signal => "signal",
782 };
783 Json(ConfirmSignupOutput {
784 access_jwt: access_meta.token,
785 refresh_jwt: refresh_meta.token,
786 handle: row.handle.into(),
787 did: row.did.into(),
788 email: row.email,
789 email_verified,
790 preferred_channel: preferred_channel.to_string(),
791 preferred_channel_verified: true,
792 })
793 .into_response()
794}
795
796#[derive(Deserialize)]
797#[serde(rename_all = "camelCase")]
798pub struct ResendVerificationInput {
799 pub did: Did,
800}
801
802pub async fn resend_verification(
803 State(state): State<AppState>,
804 Json(input): Json<ResendVerificationInput>,
805) -> Response {
806 info!("resend_verification called for DID: {}", input.did);
807 let row = match sqlx::query!(
808 r#"SELECT
809 id, handle, email,
810 preferred_comms_channel as "channel: crate::comms::CommsChannel",
811 discord_id, telegram_username, signal_number,
812 email_verified, discord_verified, telegram_verified, signal_verified
813 FROM users
814 WHERE did = $1"#,
815 input.did.as_str()
816 )
817 .fetch_optional(&state.db)
818 .await
819 {
820 Ok(Some(row)) => row,
821 Ok(None) => {
822 return ApiError::InvalidRequest("User not found".into()).into_response();
823 }
824 Err(e) => {
825 error!("Database error in resend_verification: {:?}", e);
826 return ApiError::InternalError(None).into_response();
827 }
828 };
829 let is_verified =
830 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
831 if is_verified {
832 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
833 }
834
835 let (channel_str, recipient) = match row.channel {
836 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
837 crate::comms::CommsChannel::Discord => {
838 ("discord", row.discord_id.clone().unwrap_or_default())
839 }
840 crate::comms::CommsChannel::Telegram => (
841 "telegram",
842 row.telegram_username.clone().unwrap_or_default(),
843 ),
844 crate::comms::CommsChannel::Signal => {
845 ("signal", row.signal_number.clone().unwrap_or_default())
846 }
847 };
848
849 let verification_token =
850 crate::auth::verification_token::generate_signup_token(&input.did, channel_str, &recipient);
851 let formatted_token =
852 crate::auth::verification_token::format_token_for_display(&verification_token);
853
854 if let Err(e) = crate::comms::enqueue_signup_verification(
855 &state.db,
856 row.id,
857 channel_str,
858 &recipient,
859 &formatted_token,
860 None,
861 )
862 .await
863 {
864 warn!("Failed to enqueue verification notification: {:?}", e);
865 }
866 SuccessResponse::ok().into_response()
867}
868
869#[derive(Serialize)]
870#[serde(rename_all = "camelCase")]
871pub struct SessionInfo {
872 pub id: String,
873 pub session_type: String,
874 pub client_name: Option<String>,
875 pub created_at: String,
876 pub expires_at: String,
877 pub is_current: bool,
878}
879
880#[derive(Serialize)]
881#[serde(rename_all = "camelCase")]
882pub struct ListSessionsOutput {
883 pub sessions: Vec<SessionInfo>,
884}
885
886pub async fn list_sessions(
887 State(state): State<AppState>,
888 headers: HeaderMap,
889 auth: BearerAuth,
890) -> Response {
891 let current_jti = headers
892 .get("authorization")
893 .and_then(|v| v.to_str().ok())
894 .and_then(|v| v.strip_prefix("Bearer "))
895 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
896
897 let jwt_rows = match sqlx::query_as::<
898 _,
899 (
900 i32,
901 String,
902 chrono::DateTime<chrono::Utc>,
903 chrono::DateTime<chrono::Utc>,
904 ),
905 >(
906 r#"
907 SELECT id, access_jti, created_at, refresh_expires_at
908 FROM session_tokens
909 WHERE did = $1 AND refresh_expires_at > NOW()
910 ORDER BY created_at DESC
911 "#,
912 )
913 .bind(&auth.0.did)
914 .fetch_all(&state.db)
915 .await
916 {
917 Ok(rows) => rows,
918 Err(e) => {
919 error!("DB error fetching JWT sessions: {:?}", e);
920 return ApiError::InternalError(None).into_response();
921 }
922 };
923
924 let oauth_rows = match sqlx::query_as::<
925 _,
926 (
927 i32,
928 String,
929 chrono::DateTime<chrono::Utc>,
930 chrono::DateTime<chrono::Utc>,
931 String,
932 ),
933 >(
934 r#"
935 SELECT id, token_id, created_at, expires_at, client_id
936 FROM oauth_token
937 WHERE did = $1 AND expires_at > NOW()
938 ORDER BY created_at DESC
939 "#,
940 )
941 .bind(&auth.0.did)
942 .fetch_all(&state.db)
943 .await
944 {
945 Ok(rows) => rows,
946 Err(e) => {
947 error!("DB error fetching OAuth sessions: {:?}", e);
948 return ApiError::InternalError(None).into_response();
949 }
950 };
951
952 let jwt_sessions = jwt_rows
953 .into_iter()
954 .map(|(id, access_jti, created_at, expires_at)| SessionInfo {
955 id: format!("jwt:{}", id),
956 session_type: "legacy".to_string(),
957 client_name: None,
958 created_at: created_at.to_rfc3339(),
959 expires_at: expires_at.to_rfc3339(),
960 is_current: current_jti.as_ref() == Some(&access_jti),
961 });
962
963 let is_oauth = auth.0.is_oauth;
964 let oauth_sessions =
965 oauth_rows
966 .into_iter()
967 .map(|(id, token_id, created_at, expires_at, client_id)| {
968 let client_name = extract_client_name(&client_id);
969 let is_current_oauth = is_oauth && current_jti.as_ref() == Some(&token_id);
970 SessionInfo {
971 id: format!("oauth:{}", id),
972 session_type: "oauth".to_string(),
973 client_name: Some(client_name),
974 created_at: created_at.to_rfc3339(),
975 expires_at: expires_at.to_rfc3339(),
976 is_current: is_current_oauth,
977 }
978 });
979
980 let mut sessions: Vec<SessionInfo> = jwt_sessions.chain(oauth_sessions).collect();
981 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
982
983 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
984}
985
986fn extract_client_name(client_id: &str) -> String {
987 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
988 "Localhost App".to_string()
989 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
990 parsed.host_str().unwrap_or("Unknown App").to_string()
991 } else {
992 client_id.to_string()
993 }
994}
995
996#[derive(Deserialize)]
997#[serde(rename_all = "camelCase")]
998pub struct RevokeSessionInput {
999 pub session_id: String,
1000}
1001
1002pub async fn revoke_session(
1003 State(state): State<AppState>,
1004 auth: BearerAuth,
1005 Json(input): Json<RevokeSessionInput>,
1006) -> Response {
1007 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
1008 let Ok(session_id) = jwt_id.parse::<i32>() else {
1009 return ApiError::InvalidRequest("Invalid session ID".into()).into_response();
1010 };
1011 let session = sqlx::query_as::<_, (String,)>(
1012 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
1013 )
1014 .bind(session_id)
1015 .bind(&auth.0.did)
1016 .fetch_optional(&state.db)
1017 .await;
1018 let access_jti = match session {
1019 Ok(Some((jti,))) => jti,
1020 Ok(None) => {
1021 return ApiError::SessionNotFound.into_response();
1022 }
1023 Err(e) => {
1024 error!("DB error in revoke_session: {:?}", e);
1025 return ApiError::InternalError(None).into_response();
1026 }
1027 };
1028 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
1029 .bind(session_id)
1030 .execute(&state.db)
1031 .await
1032 {
1033 error!("DB error deleting session: {:?}", e);
1034 return ApiError::InternalError(None).into_response();
1035 }
1036 let cache_key = format!("auth:session:{}:{}", &auth.0.did, access_jti);
1037 if let Err(e) = state.cache.delete(&cache_key).await {
1038 warn!("Failed to invalidate session cache: {:?}", e);
1039 }
1040 info!(did = %&auth.0.did, session_id = %session_id, "JWT session revoked");
1041 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
1042 let Ok(session_id) = oauth_id.parse::<i32>() else {
1043 return ApiError::InvalidRequest("Invalid session ID".into()).into_response();
1044 };
1045 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
1046 .bind(session_id)
1047 .bind(&auth.0.did)
1048 .execute(&state.db)
1049 .await;
1050 match result {
1051 Ok(r) if r.rows_affected() == 0 => {
1052 return ApiError::SessionNotFound.into_response();
1053 }
1054 Err(e) => {
1055 error!("DB error deleting OAuth session: {:?}", e);
1056 return ApiError::InternalError(None).into_response();
1057 }
1058 _ => {}
1059 }
1060 info!(did = %&auth.0.did, session_id = %session_id, "OAuth session revoked");
1061 } else {
1062 return ApiError::InvalidRequest("Invalid session ID format".into()).into_response();
1063 }
1064 EmptyResponse::ok().into_response()
1065}
1066
1067pub async fn revoke_all_sessions(
1068 State(state): State<AppState>,
1069 headers: HeaderMap,
1070 auth: BearerAuth,
1071) -> Response {
1072 let current_jti = crate::auth::extract_auth_token_from_header(
1073 headers.get("authorization").and_then(|v| v.to_str().ok()),
1074 )
1075 .and_then(|extracted| crate::auth::get_jti_from_token(&extracted.token).ok());
1076
1077 let Some(ref jti) = current_jti else {
1078 return ApiError::InvalidToken(None).into_response();
1079 };
1080
1081 let mut tx = match state.db.begin().await {
1082 Ok(tx) => tx,
1083 Err(e) => {
1084 error!("Failed to begin transaction: {:?}", e);
1085 return ApiError::InternalError(None).into_response();
1086 }
1087 };
1088
1089 if auth.0.is_oauth {
1090 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1091 .bind(&auth.0.did)
1092 .execute(&mut *tx)
1093 .await
1094 {
1095 error!("DB error revoking JWT sessions: {:?}", e);
1096 return ApiError::InternalError(None).into_response();
1097 }
1098 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1099 .bind(&auth.0.did)
1100 .bind(jti)
1101 .execute(&mut *tx)
1102 .await
1103 {
1104 error!("DB error revoking OAuth sessions: {:?}", e);
1105 return ApiError::InternalError(None).into_response();
1106 }
1107 } else {
1108 if let Err(e) =
1109 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1110 .bind(&auth.0.did)
1111 .bind(jti)
1112 .execute(&mut *tx)
1113 .await
1114 {
1115 error!("DB error revoking JWT sessions: {:?}", e);
1116 return ApiError::InternalError(None).into_response();
1117 }
1118 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1119 .bind(&auth.0.did)
1120 .execute(&mut *tx)
1121 .await
1122 {
1123 error!("DB error revoking OAuth sessions: {:?}", e);
1124 return ApiError::InternalError(None).into_response();
1125 }
1126 }
1127
1128 if let Err(e) = tx.commit().await {
1129 error!("Failed to commit transaction: {:?}", e);
1130 return ApiError::InternalError(None).into_response();
1131 }
1132
1133 info!(did = %&auth.0.did, "All other sessions revoked");
1134 SuccessResponse::ok().into_response()
1135}
1136
1137#[derive(Serialize)]
1138#[serde(rename_all = "camelCase")]
1139pub struct LegacyLoginPreferenceOutput {
1140 pub allow_legacy_login: bool,
1141 pub has_mfa: bool,
1142}
1143
1144pub async fn get_legacy_login_preference(
1145 State(state): State<AppState>,
1146 auth: BearerAuth,
1147) -> Response {
1148 let result = sqlx::query!(
1149 r#"SELECT
1150 u.allow_legacy_login,
1151 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1152 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1153 FROM users u WHERE u.did = $1"#,
1154 &auth.0.did
1155 )
1156 .fetch_optional(&state.db)
1157 .await;
1158
1159 match result {
1160 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1161 allow_legacy_login: row.allow_legacy_login,
1162 has_mfa: row.has_mfa,
1163 })
1164 .into_response(),
1165 Ok(None) => ApiError::AccountNotFound.into_response(),
1166 Err(e) => {
1167 error!("DB error: {:?}", e);
1168 ApiError::InternalError(None).into_response()
1169 }
1170 }
1171}
1172
1173#[derive(Deserialize)]
1174#[serde(rename_all = "camelCase")]
1175pub struct UpdateLegacyLoginInput {
1176 pub allow_legacy_login: bool,
1177}
1178
1179pub async fn update_legacy_login_preference(
1180 State(state): State<AppState>,
1181 auth: BearerAuth,
1182 Json(input): Json<UpdateLegacyLoginInput>,
1183) -> Response {
1184 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1185 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1186 .await;
1187 }
1188
1189 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1190 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1191 }
1192
1193 let result = sqlx::query!(
1194 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1195 input.allow_legacy_login,
1196 &auth.0.did
1197 )
1198 .fetch_optional(&state.db)
1199 .await;
1200
1201 match result {
1202 Ok(Some(_)) => {
1203 info!(
1204 did = %&auth.0.did,
1205 allow_legacy_login = input.allow_legacy_login,
1206 "Legacy login preference updated"
1207 );
1208 Json(json!({
1209 "allowLegacyLogin": input.allow_legacy_login
1210 }))
1211 .into_response()
1212 }
1213 Ok(None) => ApiError::AccountNotFound.into_response(),
1214 Err(e) => {
1215 error!("DB error: {:?}", e);
1216 ApiError::InternalError(None).into_response()
1217 }
1218 }
1219}
1220
1221use crate::comms::VALID_LOCALES;
1222
1223#[derive(Deserialize)]
1224#[serde(rename_all = "camelCase")]
1225pub struct UpdateLocaleInput {
1226 pub preferred_locale: String,
1227}
1228
1229pub async fn update_locale(
1230 State(state): State<AppState>,
1231 auth: BearerAuth,
1232 Json(input): Json<UpdateLocaleInput>,
1233) -> Response {
1234 if !VALID_LOCALES.contains(&input.preferred_locale.as_str()) {
1235 return ApiError::InvalidRequest(format!(
1236 "Invalid locale. Valid options: {}",
1237 VALID_LOCALES.join(", ")
1238 ))
1239 .into_response();
1240 }
1241
1242 let result = sqlx::query!(
1243 "UPDATE users SET preferred_locale = $1 WHERE did = $2 RETURNING did",
1244 input.preferred_locale,
1245 &auth.0.did
1246 )
1247 .fetch_optional(&state.db)
1248 .await;
1249
1250 match result {
1251 Ok(Some(_)) => {
1252 info!(
1253 did = %&auth.0.did,
1254 locale = %input.preferred_locale,
1255 "User locale preference updated"
1256 );
1257 Json(json!({
1258 "preferredLocale": input.preferred_locale
1259 }))
1260 .into_response()
1261 }
1262 Ok(None) => ApiError::AccountNotFound.into_response(),
1263 Err(e) => {
1264 error!("DB error updating locale: {:?}", e);
1265 ApiError::InternalError(None).into_response()
1266 }
1267 }
1268}