this repo has no description
1use crate::api::ApiError;
2use crate::auth::{BearerAuth, BearerAuthAllowDeactivated};
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tracing::{error, info, warn};
15
16fn extract_client_ip(headers: &HeaderMap) -> String {
17 if let Some(forwarded) = headers.get("x-forwarded-for")
18 && let Ok(value) = forwarded.to_str()
19 && let Some(first_ip) = value.split(',').next()
20 {
21 return first_ip.trim().to_string();
22 }
23 if let Some(real_ip) = headers.get("x-real-ip")
24 && let Ok(value) = real_ip.to_str()
25 {
26 return value.trim().to_string();
27 }
28 "unknown".to_string()
29}
30
31fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
32 let identifier = identifier.trim();
33 if identifier.contains('@') || identifier.starts_with("did:") {
34 identifier.to_string()
35 } else if !identifier.contains('.') {
36 format!("{}.{}", identifier.to_lowercase(), pds_hostname)
37 } else {
38 identifier.to_lowercase()
39 }
40}
41
42fn full_handle(stored_handle: &str, _pds_hostname: &str) -> String {
43 stored_handle.to_string()
44}
45
46#[derive(Deserialize)]
47pub struct CreateSessionInput {
48 pub identifier: String,
49 pub password: String,
50}
51
52#[derive(Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct CreateSessionOutput {
55 pub access_jwt: String,
56 pub refresh_jwt: String,
57 pub handle: String,
58 pub did: String,
59}
60
61pub async fn create_session(
62 State(state): State<AppState>,
63 headers: HeaderMap,
64 Json(input): Json<CreateSessionInput>,
65) -> Response {
66 info!(
67 "create_session called with identifier: {}",
68 input.identifier
69 );
70 let client_ip = extract_client_ip(&headers);
71 if !state
72 .check_rate_limit(RateLimitKind::Login, &client_ip)
73 .await
74 {
75 warn!(ip = %client_ip, "Login rate limit exceeded");
76 return (
77 StatusCode::TOO_MANY_REQUESTS,
78 Json(json!({
79 "error": "RateLimitExceeded",
80 "message": "Too many login attempts. Please try again later."
81 })),
82 )
83 .into_response();
84 }
85 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
86 let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
87 info!(
88 "Normalized identifier: {} -> {}",
89 input.identifier, normalized_identifier
90 );
91 let row = match sqlx::query!(
92 r#"SELECT
93 u.id, u.did, u.handle, u.password_hash,
94 u.email_verified, u.discord_verified, u.telegram_verified, u.signal_verified,
95 u.allow_legacy_login,
96 u.preferred_comms_channel as "preferred_comms_channel: crate::comms::CommsChannel",
97 k.key_bytes, k.encryption_version,
98 (SELECT verified FROM user_totp WHERE did = u.did) as totp_enabled
99 FROM users u
100 JOIN user_keys k ON u.id = k.user_id
101 WHERE u.handle = $1 OR u.email = $1 OR u.did = $1"#,
102 normalized_identifier
103 )
104 .fetch_optional(&state.db)
105 .await
106 {
107 Ok(Some(row)) => row,
108 Ok(None) => {
109 let _ = verify(
110 &input.password,
111 "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK",
112 );
113 warn!("User not found for login attempt");
114 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
115 .into_response();
116 }
117 Err(e) => {
118 error!("Database error fetching user: {:?}", e);
119 return ApiError::InternalError.into_response();
120 }
121 };
122 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
123 Ok(k) => k,
124 Err(e) => {
125 error!("Failed to decrypt user key: {:?}", e);
126 return ApiError::InternalError.into_response();
127 }
128 };
129 let password_valid = if row
130 .password_hash
131 .as_ref()
132 .map(|h| verify(&input.password, h).unwrap_or(false))
133 .unwrap_or(false)
134 {
135 true
136 } else {
137 let app_passwords = sqlx::query!(
138 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
139 row.id
140 )
141 .fetch_all(&state.db)
142 .await
143 .unwrap_or_default();
144 app_passwords
145 .iter()
146 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false))
147 };
148 if !password_valid {
149 warn!("Password verification failed for login attempt");
150 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into())
151 .into_response();
152 }
153 let is_verified =
154 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
155 if !is_verified {
156 warn!("Login attempt for unverified account: {}", row.did);
157 return (
158 StatusCode::FORBIDDEN,
159 Json(json!({
160 "error": "AccountNotVerified",
161 "message": "Please verify your account before logging in",
162 "did": row.did
163 })),
164 )
165 .into_response();
166 }
167 let has_totp = row.totp_enabled.unwrap_or(false);
168 let is_legacy_login = has_totp;
169 if has_totp && !row.allow_legacy_login {
170 warn!(
171 "Legacy login blocked for TOTP-enabled account: {}",
172 row.did
173 );
174 return (
175 StatusCode::FORBIDDEN,
176 Json(json!({
177 "error": "MfaRequired",
178 "message": "This account requires MFA. Please use an OAuth client that supports TOTP verification.",
179 "did": row.did
180 })),
181 )
182 .into_response();
183 }
184 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
185 Ok(m) => m,
186 Err(e) => {
187 error!("Failed to create access token: {:?}", e);
188 return ApiError::InternalError.into_response();
189 }
190 };
191 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
192 Ok(m) => m,
193 Err(e) => {
194 error!("Failed to create refresh token: {:?}", e);
195 return ApiError::InternalError.into_response();
196 }
197 };
198 if let Err(e) = sqlx::query!(
199 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)",
200 row.did,
201 access_meta.jti,
202 refresh_meta.jti,
203 access_meta.expires_at,
204 refresh_meta.expires_at,
205 is_legacy_login,
206 false
207 )
208 .execute(&state.db)
209 .await
210 {
211 error!("Failed to insert session: {:?}", e);
212 return ApiError::InternalError.into_response();
213 }
214 if is_legacy_login {
215 warn!(
216 did = %row.did,
217 ip = %client_ip,
218 "Legacy login on TOTP-enabled account - sending notification"
219 );
220 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
221 if let Err(e) = crate::comms::queue_legacy_login_notification(
222 &state.db,
223 row.id,
224 &hostname,
225 &client_ip,
226 row.preferred_comms_channel,
227 )
228 .await
229 {
230 error!("Failed to queue legacy login notification: {:?}", e);
231 }
232 }
233 let handle = full_handle(&row.handle, &pds_hostname);
234 Json(CreateSessionOutput {
235 access_jwt: access_meta.token,
236 refresh_jwt: refresh_meta.token,
237 handle,
238 did: row.did,
239 })
240 .into_response()
241}
242
243pub async fn get_session(
244 State(state): State<AppState>,
245 BearerAuthAllowDeactivated(auth_user): BearerAuthAllowDeactivated,
246) -> Response {
247 let permissions = auth_user.permissions();
248 let can_read_email = permissions.allows_email_read();
249
250 match sqlx::query!(
251 r#"SELECT
252 handle, email, email_verified, is_admin, deactivated_at,
253 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
254 discord_verified, telegram_verified, signal_verified
255 FROM users WHERE did = $1"#,
256 auth_user.did
257 )
258 .fetch_optional(&state.db)
259 .await
260 {
261 Ok(Some(row)) => {
262 let (preferred_channel, preferred_channel_verified) = match row.preferred_channel {
263 crate::comms::CommsChannel::Email => ("email", row.email_verified),
264 crate::comms::CommsChannel::Discord => ("discord", row.discord_verified),
265 crate::comms::CommsChannel::Telegram => ("telegram", row.telegram_verified),
266 crate::comms::CommsChannel::Signal => ("signal", row.signal_verified),
267 };
268 let pds_hostname =
269 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
270 let handle = full_handle(&row.handle, &pds_hostname);
271 let is_active = row.deactivated_at.is_none();
272 let email_value = if can_read_email {
273 row.email.clone()
274 } else {
275 None
276 };
277 let email_verified_value = can_read_email && row.email_verified;
278 Json(json!({
279 "handle": handle,
280 "did": auth_user.did,
281 "email": email_value,
282 "emailVerified": email_verified_value,
283 "preferredChannel": preferred_channel,
284 "preferredChannelVerified": preferred_channel_verified,
285 "isAdmin": row.is_admin,
286 "active": is_active,
287 "status": if is_active { "active" } else { "deactivated" },
288 "didDoc": {}
289 }))
290 .into_response()
291 }
292 Ok(None) => ApiError::AuthenticationFailed.into_response(),
293 Err(e) => {
294 error!("Database error in get_session: {:?}", e);
295 ApiError::InternalError.into_response()
296 }
297 }
298}
299
300pub async fn delete_session(
301 State(state): State<AppState>,
302 headers: axum::http::HeaderMap,
303) -> Response {
304 let token = match crate::auth::extract_bearer_token_from_header(
305 headers.get("Authorization").and_then(|h| h.to_str().ok()),
306 ) {
307 Some(t) => t,
308 None => return ApiError::AuthenticationRequired.into_response(),
309 };
310 let jti = match crate::auth::get_jti_from_token(&token) {
311 Ok(jti) => jti,
312 Err(_) => return ApiError::AuthenticationFailed.into_response(),
313 };
314 let did = crate::auth::get_did_from_token(&token).ok();
315 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
316 .execute(&state.db)
317 .await
318 {
319 Ok(res) if res.rows_affected() > 0 => {
320 if let Some(did) = did {
321 let session_cache_key = format!("auth:session:{}:{}", did, jti);
322 let _ = state.cache.delete(&session_cache_key).await;
323 }
324 Json(json!({})).into_response()
325 }
326 Ok(_) => ApiError::AuthenticationFailed.into_response(),
327 Err(e) => {
328 error!("Database error in delete_session: {:?}", e);
329 ApiError::AuthenticationFailed.into_response()
330 }
331 }
332}
333
334pub async fn refresh_session(
335 State(state): State<AppState>,
336 headers: axum::http::HeaderMap,
337) -> Response {
338 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
339 if !state
340 .check_rate_limit(RateLimitKind::RefreshSession, &client_ip)
341 .await
342 {
343 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
344 return (
345 axum::http::StatusCode::TOO_MANY_REQUESTS,
346 axum::Json(serde_json::json!({
347 "error": "RateLimitExceeded",
348 "message": "Too many requests. Please try again later."
349 })),
350 )
351 .into_response();
352 }
353 let refresh_token = match crate::auth::extract_bearer_token_from_header(
354 headers.get("Authorization").and_then(|h| h.to_str().ok()),
355 ) {
356 Some(t) => t,
357 None => return ApiError::AuthenticationRequired.into_response(),
358 };
359 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
360 Ok(jti) => jti,
361 Err(_) => {
362 return ApiError::AuthenticationFailedMsg("Invalid token format".into())
363 .into_response();
364 }
365 };
366 let mut tx = match state.db.begin().await {
367 Ok(tx) => tx,
368 Err(e) => {
369 error!("Failed to begin transaction: {:?}", e);
370 return ApiError::InternalError.into_response();
371 }
372 };
373 if let Ok(Some(session_id)) = sqlx::query_scalar!(
374 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
375 refresh_jti
376 )
377 .fetch_optional(&mut *tx)
378 .await
379 {
380 warn!(
381 "Refresh token reuse detected! Revoking token family for session_id: {}",
382 session_id
383 );
384 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
385 .execute(&mut *tx)
386 .await;
387 let _ = tx.commit().await;
388 return ApiError::ExpiredTokenMsg(
389 "Refresh token has been revoked due to suspected compromise".into(),
390 )
391 .into_response();
392 }
393 let session_row = match sqlx::query!(
394 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
395 FROM session_tokens st
396 JOIN users u ON st.did = u.did
397 JOIN user_keys k ON u.id = k.user_id
398 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
399 FOR UPDATE OF st"#,
400 refresh_jti
401 )
402 .fetch_optional(&mut *tx)
403 .await
404 {
405 Ok(Some(row)) => row,
406 Ok(None) => {
407 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into())
408 .into_response();
409 }
410 Err(e) => {
411 error!("Database error fetching session: {:?}", e);
412 return ApiError::InternalError.into_response();
413 }
414 };
415 let key_bytes =
416 match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
417 Ok(k) => k,
418 Err(e) => {
419 error!("Failed to decrypt user key: {:?}", e);
420 return ApiError::InternalError.into_response();
421 }
422 };
423 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
424 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
425 }
426 let new_access_meta =
427 match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
428 Ok(m) => m,
429 Err(e) => {
430 error!("Failed to create access token: {:?}", e);
431 return ApiError::InternalError.into_response();
432 }
433 };
434 let new_refresh_meta =
435 match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
436 Ok(m) => m,
437 Err(e) => {
438 error!("Failed to create refresh token: {:?}", e);
439 return ApiError::InternalError.into_response();
440 }
441 };
442 match sqlx::query!(
443 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
444 refresh_jti,
445 session_row.id
446 )
447 .execute(&mut *tx)
448 .await
449 {
450 Ok(result) if result.rows_affected() == 0 => {
451 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
452 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
453 .execute(&mut *tx)
454 .await;
455 let _ = tx.commit().await;
456 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
457 }
458 Err(e) => {
459 error!("Failed to record used refresh token: {:?}", e);
460 return ApiError::InternalError.into_response();
461 }
462 Ok(_) => {}
463 }
464 if let Err(e) = sqlx::query!(
465 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
466 new_access_meta.jti,
467 new_refresh_meta.jti,
468 new_access_meta.expires_at,
469 new_refresh_meta.expires_at,
470 session_row.id
471 )
472 .execute(&mut *tx)
473 .await
474 {
475 error!("Database error updating session: {:?}", e);
476 return ApiError::InternalError.into_response();
477 }
478 if let Err(e) = tx.commit().await {
479 error!("Failed to commit transaction: {:?}", e);
480 return ApiError::InternalError.into_response();
481 }
482 match sqlx::query!(
483 r#"SELECT
484 handle, email, email_verified, is_admin,
485 preferred_comms_channel as "preferred_channel: crate::comms::CommsChannel",
486 discord_verified, telegram_verified, signal_verified
487 FROM users WHERE did = $1"#,
488 session_row.did
489 )
490 .fetch_optional(&state.db)
491 .await
492 {
493 Ok(Some(u)) => {
494 let (preferred_channel, preferred_channel_verified) = match u.preferred_channel {
495 crate::comms::CommsChannel::Email => ("email", u.email_verified),
496 crate::comms::CommsChannel::Discord => ("discord", u.discord_verified),
497 crate::comms::CommsChannel::Telegram => ("telegram", u.telegram_verified),
498 crate::comms::CommsChannel::Signal => ("signal", u.signal_verified),
499 };
500 let pds_hostname =
501 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
502 let handle = full_handle(&u.handle, &pds_hostname);
503 Json(json!({
504 "accessJwt": new_access_meta.token,
505 "refreshJwt": new_refresh_meta.token,
506 "handle": handle,
507 "did": session_row.did,
508 "email": u.email,
509 "emailVerified": u.email_verified,
510 "preferredChannel": preferred_channel,
511 "preferredChannelVerified": preferred_channel_verified,
512 "isAdmin": u.is_admin,
513 "active": true
514 }))
515 .into_response()
516 }
517 Ok(None) => {
518 error!("User not found for existing session: {}", session_row.did);
519 ApiError::InternalError.into_response()
520 }
521 Err(e) => {
522 error!("Database error fetching user: {:?}", e);
523 ApiError::InternalError.into_response()
524 }
525 }
526}
527
528#[derive(Deserialize)]
529#[serde(rename_all = "camelCase")]
530pub struct ConfirmSignupInput {
531 pub did: String,
532 pub verification_code: String,
533}
534
535#[derive(Serialize)]
536#[serde(rename_all = "camelCase")]
537pub struct ConfirmSignupOutput {
538 pub access_jwt: String,
539 pub refresh_jwt: String,
540 pub handle: String,
541 pub did: String,
542 pub email: Option<String>,
543 pub email_verified: bool,
544 pub preferred_channel: String,
545 pub preferred_channel_verified: bool,
546}
547
548pub async fn confirm_signup(
549 State(state): State<AppState>,
550 Json(input): Json<ConfirmSignupInput>,
551) -> Response {
552 info!("confirm_signup called for DID: {}", input.did);
553 let row = match sqlx::query!(
554 r#"SELECT
555 u.id, u.did, u.handle, u.email,
556 u.preferred_comms_channel as "channel: crate::comms::CommsChannel",
557 k.key_bytes, k.encryption_version
558 FROM users u
559 JOIN user_keys k ON u.id = k.user_id
560 WHERE u.did = $1"#,
561 input.did
562 )
563 .fetch_optional(&state.db)
564 .await
565 {
566 Ok(Some(row)) => row,
567 Ok(None) => {
568 warn!("User not found for confirm_signup: {}", input.did);
569 return ApiError::InvalidRequest("Invalid DID or verification code".into())
570 .into_response();
571 }
572 Err(e) => {
573 error!("Database error in confirm_signup: {:?}", e);
574 return ApiError::InternalError.into_response();
575 }
576 };
577
578 let channel_str = match row.channel {
579 crate::comms::CommsChannel::Email => "email",
580 crate::comms::CommsChannel::Discord => "discord",
581 crate::comms::CommsChannel::Telegram => "telegram",
582 crate::comms::CommsChannel::Signal => "signal",
583 };
584 let verification = match sqlx::query!(
585 "SELECT code, expires_at FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel",
586 row.id,
587 channel_str as _
588 )
589 .fetch_optional(&state.db)
590 .await
591 {
592 Ok(Some(v)) => v,
593 Ok(None) => {
594 warn!("No verification code found for user: {}", input.did);
595 return ApiError::InvalidRequest("No pending verification".into()).into_response();
596 }
597 Err(e) => {
598 error!("Database error fetching verification: {:?}", e);
599 return ApiError::InternalError.into_response();
600 }
601 };
602
603 if verification.code != input.verification_code {
604 warn!("Invalid verification code for user: {}", input.did);
605 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
606 }
607 if verification.expires_at < Utc::now() {
608 warn!("Verification code expired for user: {}", input.did);
609 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response();
610 }
611
612 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
613 Ok(k) => k,
614 Err(e) => {
615 error!("Failed to decrypt user key: {:?}", e);
616 return ApiError::InternalError.into_response();
617 }
618 };
619 let verified_column = match row.channel {
620 crate::comms::CommsChannel::Email => "email_verified",
621 crate::comms::CommsChannel::Discord => "discord_verified",
622 crate::comms::CommsChannel::Telegram => "telegram_verified",
623 crate::comms::CommsChannel::Signal => "signal_verified",
624 };
625 let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column);
626 if let Err(e) = sqlx::query(&update_query)
627 .bind(&input.did)
628 .execute(&state.db)
629 .await
630 {
631 error!("Failed to update verification status: {:?}", e);
632 return ApiError::InternalError.into_response();
633 }
634
635 if let Err(e) = sqlx::query!(
636 "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel",
637 row.id,
638 channel_str as _
639 )
640 .execute(&state.db)
641 .await
642 {
643 error!("Failed to delete verification record: {:?}", e);
644 }
645
646 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
647 Ok(m) => m,
648 Err(e) => {
649 error!("Failed to create access token: {:?}", e);
650 return ApiError::InternalError.into_response();
651 }
652 };
653 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
654 Ok(m) => m,
655 Err(e) => {
656 error!("Failed to create refresh token: {:?}", e);
657 return ApiError::InternalError.into_response();
658 }
659 };
660 if let Err(e) = sqlx::query!(
661 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified) VALUES ($1, $2, $3, $4, $5, $6, $7)",
662 row.did,
663 access_meta.jti,
664 refresh_meta.jti,
665 access_meta.expires_at,
666 refresh_meta.expires_at,
667 false,
668 false
669 )
670 .execute(&state.db)
671 .await
672 {
673 error!("Failed to insert session: {:?}", e);
674 return ApiError::InternalError.into_response();
675 }
676 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
677 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await {
678 warn!("Failed to enqueue welcome notification: {:?}", e);
679 }
680 let email_verified = matches!(row.channel, crate::comms::CommsChannel::Email);
681 let preferred_channel = match row.channel {
682 crate::comms::CommsChannel::Email => "email",
683 crate::comms::CommsChannel::Discord => "discord",
684 crate::comms::CommsChannel::Telegram => "telegram",
685 crate::comms::CommsChannel::Signal => "signal",
686 };
687 Json(ConfirmSignupOutput {
688 access_jwt: access_meta.token,
689 refresh_jwt: refresh_meta.token,
690 handle: row.handle,
691 did: row.did,
692 email: row.email,
693 email_verified,
694 preferred_channel: preferred_channel.to_string(),
695 preferred_channel_verified: true,
696 })
697 .into_response()
698}
699
700#[derive(Deserialize)]
701#[serde(rename_all = "camelCase")]
702pub struct ResendVerificationInput {
703 pub did: String,
704}
705
706pub async fn resend_verification(
707 State(state): State<AppState>,
708 Json(input): Json<ResendVerificationInput>,
709) -> Response {
710 info!("resend_verification called for DID: {}", input.did);
711 let row = match sqlx::query!(
712 r#"SELECT
713 id, handle, email,
714 preferred_comms_channel as "channel: crate::comms::CommsChannel",
715 discord_id, telegram_username, signal_number,
716 email_verified, discord_verified, telegram_verified, signal_verified
717 FROM users
718 WHERE did = $1"#,
719 input.did
720 )
721 .fetch_optional(&state.db)
722 .await
723 {
724 Ok(Some(row)) => row,
725 Ok(None) => {
726 return ApiError::InvalidRequest("User not found".into()).into_response();
727 }
728 Err(e) => {
729 error!("Database error in resend_verification: {:?}", e);
730 return ApiError::InternalError.into_response();
731 }
732 };
733 let is_verified =
734 row.email_verified || row.discord_verified || row.telegram_verified || row.signal_verified;
735 if is_verified {
736 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
737 }
738 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
739 let code_expires_at = Utc::now() + chrono::Duration::minutes(30);
740
741 let (channel_str, recipient) = match row.channel {
742 crate::comms::CommsChannel::Email => ("email", row.email.clone().unwrap_or_default()),
743 crate::comms::CommsChannel::Discord => {
744 ("discord", row.discord_id.clone().unwrap_or_default())
745 }
746 crate::comms::CommsChannel::Telegram => (
747 "telegram",
748 row.telegram_username.clone().unwrap_or_default(),
749 ),
750 crate::comms::CommsChannel::Signal => {
751 ("signal", row.signal_number.clone().unwrap_or_default())
752 }
753 };
754
755 if let Err(e) = sqlx::query!(
756 r#"
757 INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at)
758 VALUES ($1, $2::comms_channel, $3, $4, $5)
759 ON CONFLICT (user_id, channel) DO UPDATE
760 SET code = $3, pending_identifier = $4, expires_at = $5, created_at = NOW()
761 "#,
762 row.id,
763 channel_str as _,
764 verification_code,
765 recipient,
766 code_expires_at
767 )
768 .execute(&state.db)
769 .await
770 {
771 error!("Failed to update verification code: {:?}", e);
772 return ApiError::InternalError.into_response();
773 }
774 if let Err(e) = crate::comms::enqueue_signup_verification(
775 &state.db,
776 row.id,
777 channel_str,
778 &recipient,
779 &verification_code,
780 )
781 .await
782 {
783 warn!("Failed to enqueue verification notification: {:?}", e);
784 }
785 Json(json!({"success": true})).into_response()
786}
787
788#[derive(Serialize)]
789#[serde(rename_all = "camelCase")]
790pub struct SessionInfo {
791 pub id: String,
792 pub session_type: String,
793 pub client_name: Option<String>,
794 pub created_at: String,
795 pub expires_at: String,
796 pub is_current: bool,
797}
798
799#[derive(Serialize)]
800#[serde(rename_all = "camelCase")]
801pub struct ListSessionsOutput {
802 pub sessions: Vec<SessionInfo>,
803}
804
805pub async fn list_sessions(
806 State(state): State<AppState>,
807 headers: HeaderMap,
808 auth: BearerAuth,
809) -> Response {
810 let current_jti = headers
811 .get("authorization")
812 .and_then(|v| v.to_str().ok())
813 .and_then(|v| v.strip_prefix("Bearer "))
814 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
815
816 let mut sessions: Vec<SessionInfo> = Vec::new();
817
818 let jwt_result = sqlx::query_as::<
819 _,
820 (
821 i32,
822 String,
823 chrono::DateTime<chrono::Utc>,
824 chrono::DateTime<chrono::Utc>,
825 ),
826 >(
827 r#"
828 SELECT id, access_jti, created_at, refresh_expires_at
829 FROM session_tokens
830 WHERE did = $1 AND refresh_expires_at > NOW()
831 ORDER BY created_at DESC
832 "#,
833 )
834 .bind(&auth.0.did)
835 .fetch_all(&state.db)
836 .await;
837
838 match jwt_result {
839 Ok(rows) => {
840 for (id, access_jti, created_at, expires_at) in rows {
841 sessions.push(SessionInfo {
842 id: format!("jwt:{}", id),
843 session_type: "legacy".to_string(),
844 client_name: None,
845 created_at: created_at.to_rfc3339(),
846 expires_at: expires_at.to_rfc3339(),
847 is_current: current_jti.as_ref() == Some(&access_jti),
848 });
849 }
850 }
851 Err(e) => {
852 error!("DB error fetching JWT sessions: {:?}", e);
853 return (
854 StatusCode::INTERNAL_SERVER_ERROR,
855 Json(json!({"error": "InternalError"})),
856 )
857 .into_response();
858 }
859 }
860
861 let oauth_result = sqlx::query_as::<
862 _,
863 (
864 i32,
865 String,
866 chrono::DateTime<chrono::Utc>,
867 chrono::DateTime<chrono::Utc>,
868 String,
869 ),
870 >(
871 r#"
872 SELECT id, token_id, created_at, expires_at, client_id
873 FROM oauth_token
874 WHERE did = $1 AND expires_at > NOW()
875 ORDER BY created_at DESC
876 "#,
877 )
878 .bind(&auth.0.did)
879 .fetch_all(&state.db)
880 .await;
881
882 match oauth_result {
883 Ok(rows) => {
884 for (id, token_id, created_at, expires_at, client_id) in rows {
885 let client_name = extract_client_name(&client_id);
886 let is_current_oauth = auth.0.is_oauth
887 && current_jti.as_ref() == Some(&token_id);
888 sessions.push(SessionInfo {
889 id: format!("oauth:{}", id),
890 session_type: "oauth".to_string(),
891 client_name: Some(client_name),
892 created_at: created_at.to_rfc3339(),
893 expires_at: expires_at.to_rfc3339(),
894 is_current: is_current_oauth,
895 });
896 }
897 }
898 Err(e) => {
899 error!("DB error fetching OAuth sessions: {:?}", e);
900 return (
901 StatusCode::INTERNAL_SERVER_ERROR,
902 Json(json!({"error": "InternalError"})),
903 )
904 .into_response();
905 }
906 }
907
908 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
909
910 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response()
911}
912
913fn extract_client_name(client_id: &str) -> String {
914 if client_id.starts_with("http://localhost") || client_id.starts_with("http://127.0.0.1") {
915 "Localhost App".to_string()
916 } else if let Ok(parsed) = reqwest::Url::parse(client_id) {
917 parsed.host_str().unwrap_or("Unknown App").to_string()
918 } else {
919 client_id.to_string()
920 }
921}
922
923#[derive(Deserialize)]
924#[serde(rename_all = "camelCase")]
925pub struct RevokeSessionInput {
926 pub session_id: String,
927}
928
929pub async fn revoke_session(
930 State(state): State<AppState>,
931 auth: BearerAuth,
932 Json(input): Json<RevokeSessionInput>,
933) -> Response {
934 if let Some(jwt_id) = input.session_id.strip_prefix("jwt:") {
935 let session_id: i32 = match jwt_id.parse() {
936 Ok(id) => id,
937 Err(_) => {
938 return (
939 StatusCode::BAD_REQUEST,
940 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
941 )
942 .into_response();
943 }
944 };
945 let session = sqlx::query_as::<_, (String,)>(
946 "SELECT access_jti FROM session_tokens WHERE id = $1 AND did = $2",
947 )
948 .bind(session_id)
949 .bind(&auth.0.did)
950 .fetch_optional(&state.db)
951 .await;
952 let access_jti = match session {
953 Ok(Some((jti,))) => jti,
954 Ok(None) => {
955 return (
956 StatusCode::NOT_FOUND,
957 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
958 )
959 .into_response();
960 }
961 Err(e) => {
962 error!("DB error in revoke_session: {:?}", e);
963 return (
964 StatusCode::INTERNAL_SERVER_ERROR,
965 Json(json!({"error": "InternalError"})),
966 )
967 .into_response();
968 }
969 };
970 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE id = $1")
971 .bind(session_id)
972 .execute(&state.db)
973 .await
974 {
975 error!("DB error deleting session: {:?}", e);
976 return (
977 StatusCode::INTERNAL_SERVER_ERROR,
978 Json(json!({"error": "InternalError"})),
979 )
980 .into_response();
981 }
982 let cache_key = format!("auth:session:{}:{}", auth.0.did, access_jti);
983 if let Err(e) = state.cache.delete(&cache_key).await {
984 warn!("Failed to invalidate session cache: {:?}", e);
985 }
986 info!(did = %auth.0.did, session_id = %session_id, "JWT session revoked");
987 } else if let Some(oauth_id) = input.session_id.strip_prefix("oauth:") {
988 let session_id: i32 = match oauth_id.parse() {
989 Ok(id) => id,
990 Err(_) => {
991 return (
992 StatusCode::BAD_REQUEST,
993 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID"})),
994 )
995 .into_response();
996 }
997 };
998 let result = sqlx::query("DELETE FROM oauth_token WHERE id = $1 AND did = $2")
999 .bind(session_id)
1000 .bind(&auth.0.did)
1001 .execute(&state.db)
1002 .await;
1003 match result {
1004 Ok(r) if r.rows_affected() == 0 => {
1005 return (
1006 StatusCode::NOT_FOUND,
1007 Json(json!({"error": "SessionNotFound", "message": "Session not found"})),
1008 )
1009 .into_response();
1010 }
1011 Err(e) => {
1012 error!("DB error deleting OAuth session: {:?}", e);
1013 return (
1014 StatusCode::INTERNAL_SERVER_ERROR,
1015 Json(json!({"error": "InternalError"})),
1016 )
1017 .into_response();
1018 }
1019 _ => {}
1020 }
1021 info!(did = %auth.0.did, session_id = %session_id, "OAuth session revoked");
1022 } else {
1023 return (
1024 StatusCode::BAD_REQUEST,
1025 Json(json!({"error": "InvalidRequest", "message": "Invalid session ID format"})),
1026 )
1027 .into_response();
1028 }
1029 (StatusCode::OK, Json(json!({}))).into_response()
1030}
1031
1032pub async fn revoke_all_sessions(
1033 State(state): State<AppState>,
1034 headers: HeaderMap,
1035 auth: BearerAuth,
1036) -> Response {
1037 let current_jti = headers
1038 .get("authorization")
1039 .and_then(|v| v.to_str().ok())
1040 .and_then(|v| v.strip_prefix("Bearer "))
1041 .and_then(|token| crate::auth::get_jti_from_token(token).ok());
1042
1043 if let Some(ref jti) = current_jti {
1044 if auth.0.is_oauth {
1045 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1")
1046 .bind(&auth.0.did)
1047 .execute(&state.db)
1048 .await
1049 {
1050 error!("DB error revoking JWT sessions: {:?}", e);
1051 return (
1052 StatusCode::INTERNAL_SERVER_ERROR,
1053 Json(json!({"error": "InternalError"})),
1054 )
1055 .into_response();
1056 }
1057 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2")
1058 .bind(&auth.0.did)
1059 .bind(jti)
1060 .execute(&state.db)
1061 .await
1062 {
1063 error!("DB error revoking OAuth sessions: {:?}", e);
1064 return (
1065 StatusCode::INTERNAL_SERVER_ERROR,
1066 Json(json!({"error": "InternalError"})),
1067 )
1068 .into_response();
1069 }
1070 } else {
1071 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2")
1072 .bind(&auth.0.did)
1073 .bind(jti)
1074 .execute(&state.db)
1075 .await
1076 {
1077 error!("DB error revoking JWT sessions: {:?}", e);
1078 return (
1079 StatusCode::INTERNAL_SERVER_ERROR,
1080 Json(json!({"error": "InternalError"})),
1081 )
1082 .into_response();
1083 }
1084 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1")
1085 .bind(&auth.0.did)
1086 .execute(&state.db)
1087 .await
1088 {
1089 error!("DB error revoking OAuth sessions: {:?}", e);
1090 return (
1091 StatusCode::INTERNAL_SERVER_ERROR,
1092 Json(json!({"error": "InternalError"})),
1093 )
1094 .into_response();
1095 }
1096 }
1097 } else {
1098 return (
1099 StatusCode::BAD_REQUEST,
1100 Json(json!({"error": "InvalidToken", "message": "Could not identify current session"})),
1101 )
1102 .into_response();
1103 }
1104
1105 info!(did = %auth.0.did, "All other sessions revoked");
1106 (StatusCode::OK, Json(json!({"success": true}))).into_response()
1107}
1108
1109#[derive(Serialize)]
1110#[serde(rename_all = "camelCase")]
1111pub struct LegacyLoginPreferenceOutput {
1112 pub allow_legacy_login: bool,
1113 pub has_mfa: bool,
1114}
1115
1116pub async fn get_legacy_login_preference(
1117 State(state): State<AppState>,
1118 auth: BearerAuth,
1119) -> Response {
1120 let result = sqlx::query!(
1121 r#"SELECT
1122 u.allow_legacy_login,
1123 (EXISTS(SELECT 1 FROM user_totp t WHERE t.did = u.did AND t.verified = TRUE) OR
1124 EXISTS(SELECT 1 FROM passkeys p WHERE p.did = u.did)) as "has_mfa!"
1125 FROM users u WHERE u.did = $1"#,
1126 auth.0.did
1127 )
1128 .fetch_optional(&state.db)
1129 .await;
1130
1131 match result {
1132 Ok(Some(row)) => Json(LegacyLoginPreferenceOutput {
1133 allow_legacy_login: row.allow_legacy_login,
1134 has_mfa: row.has_mfa,
1135 })
1136 .into_response(),
1137 Ok(None) => (
1138 StatusCode::NOT_FOUND,
1139 Json(json!({"error": "AccountNotFound"})),
1140 )
1141 .into_response(),
1142 Err(e) => {
1143 error!("DB error: {:?}", e);
1144 (
1145 StatusCode::INTERNAL_SERVER_ERROR,
1146 Json(json!({"error": "InternalError"})),
1147 )
1148 .into_response()
1149 }
1150 }
1151}
1152
1153#[derive(Deserialize)]
1154#[serde(rename_all = "camelCase")]
1155pub struct UpdateLegacyLoginInput {
1156 pub allow_legacy_login: bool,
1157}
1158
1159pub async fn update_legacy_login_preference(
1160 State(state): State<AppState>,
1161 auth: BearerAuth,
1162 Json(input): Json<UpdateLegacyLoginInput>,
1163) -> Response {
1164 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &auth.0.did).await {
1165 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &auth.0.did)
1166 .await;
1167 }
1168
1169 if crate::api::server::reauth::check_reauth_required(&state.db, &auth.0.did).await {
1170 return crate::api::server::reauth::reauth_required_response(&state.db, &auth.0.did).await;
1171 }
1172
1173 let result = sqlx::query!(
1174 "UPDATE users SET allow_legacy_login = $1 WHERE did = $2 RETURNING did",
1175 input.allow_legacy_login,
1176 auth.0.did
1177 )
1178 .fetch_optional(&state.db)
1179 .await;
1180
1181 match result {
1182 Ok(Some(_)) => {
1183 info!(
1184 did = %auth.0.did,
1185 allow_legacy_login = input.allow_legacy_login,
1186 "Legacy login preference updated"
1187 );
1188 Json(json!({
1189 "allowLegacyLogin": input.allow_legacy_login
1190 }))
1191 .into_response()
1192 }
1193 Ok(None) => (
1194 StatusCode::NOT_FOUND,
1195 Json(json!({"error": "AccountNotFound"})),
1196 )
1197 .into_response(),
1198 Err(e) => {
1199 error!("DB error: {:?}", e);
1200 (
1201 StatusCode::INTERNAL_SERVER_ERROR,
1202 Json(json!({"error": "InternalError"})),
1203 )
1204 .into_response()
1205 }
1206 }
1207}