this repo has no description
1use crate::api::ApiError;
2use crate::auth::BearerAuth;
3use crate::state::{AppState, RateLimitKind};
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tracing::{error, info, warn};
15
16fn extract_client_ip(headers: &HeaderMap) -> String {
17 if let Some(forwarded) = headers.get("x-forwarded-for") {
18 if let Ok(value) = forwarded.to_str() {
19 if let Some(first_ip) = value.split(',').next() {
20 return first_ip.trim().to_string();
21 }
22 }
23 }
24 if let Some(real_ip) = headers.get("x-real-ip") {
25 if let Ok(value) = real_ip.to_str() {
26 return value.trim().to_string();
27 }
28 }
29 "unknown".to_string()
30}
31
32#[derive(Deserialize)]
33pub struct CreateSessionInput {
34 pub identifier: String,
35 pub password: String,
36}
37
38#[derive(Serialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateSessionOutput {
41 pub access_jwt: String,
42 pub refresh_jwt: String,
43 pub handle: String,
44 pub did: String,
45}
46
47pub async fn create_session(
48 State(state): State<AppState>,
49 headers: HeaderMap,
50 Json(input): Json<CreateSessionInput>,
51) -> Response {
52 info!("create_session called");
53
54 let client_ip = extract_client_ip(&headers);
55 if !state.check_rate_limit(RateLimitKind::Login, &client_ip).await {
56 warn!(ip = %client_ip, "Login rate limit exceeded");
57 return (
58 StatusCode::TOO_MANY_REQUESTS,
59 Json(json!({
60 "error": "RateLimitExceeded",
61 "message": "Too many login attempts. Please try again later."
62 })),
63 )
64 .into_response();
65 }
66
67 let row = match sqlx::query!(
68 r#"SELECT
69 u.id, u.did, u.handle, u.password_hash,
70 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified,
71 k.key_bytes, k.encryption_version
72 FROM users u
73 JOIN user_keys k ON u.id = k.user_id
74 WHERE u.handle = $1 OR u.email = $1"#,
75 input.identifier
76 )
77 .fetch_optional(&state.db)
78 .await
79 {
80 Ok(Some(row)) => row,
81 Ok(None) => {
82 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK");
83 warn!("User not found for login attempt");
84 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
85 }
86 Err(e) => {
87 error!("Database error fetching user: {:?}", e);
88 return ApiError::InternalError.into_response();
89 }
90 };
91
92 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
93 Ok(k) => k,
94 Err(e) => {
95 error!("Failed to decrypt user key: {:?}", e);
96 return ApiError::InternalError.into_response();
97 }
98 };
99
100 let password_valid = if verify(&input.password, &row.password_hash).unwrap_or(false) {
101 true
102 } else {
103 let app_passwords = sqlx::query!(
104 "SELECT password_hash FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
105 row.id
106 )
107 .fetch_all(&state.db)
108 .await
109 .unwrap_or_default();
110
111 app_passwords.iter().any(|app| verify(&input.password, &app.password_hash).unwrap_or(false))
112 };
113
114 if !password_valid {
115 warn!("Password verification failed for login attempt");
116 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
117 }
118
119 let is_verified = row.email_confirmed
120 || row.discord_verified
121 || row.telegram_verified
122 || row.signal_verified;
123
124 if !is_verified {
125 warn!("Login attempt for unverified account: {}", row.did);
126 return (
127 StatusCode::FORBIDDEN,
128 Json(json!({
129 "error": "AccountNotVerified",
130 "message": "Please verify your account before logging in",
131 "did": row.did
132 })),
133 ).into_response();
134 }
135
136 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
137 Ok(m) => m,
138 Err(e) => {
139 error!("Failed to create access token: {:?}", e);
140 return ApiError::InternalError.into_response();
141 }
142 };
143
144 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
145 Ok(m) => m,
146 Err(e) => {
147 error!("Failed to create refresh token: {:?}", e);
148 return ApiError::InternalError.into_response();
149 }
150 };
151
152 if let Err(e) = sqlx::query!(
153 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
154 row.did,
155 access_meta.jti,
156 refresh_meta.jti,
157 access_meta.expires_at,
158 refresh_meta.expires_at
159 )
160 .execute(&state.db)
161 .await
162 {
163 error!("Failed to insert session: {:?}", e);
164 return ApiError::InternalError.into_response();
165 }
166
167 Json(CreateSessionOutput {
168 access_jwt: access_meta.token,
169 refresh_jwt: refresh_meta.token,
170 handle: row.handle,
171 did: row.did,
172 }).into_response()
173}
174
175pub async fn get_session(
176 State(state): State<AppState>,
177 BearerAuth(auth_user): BearerAuth,
178) -> Response {
179 match sqlx::query!("SELECT handle, email FROM users WHERE did = $1", auth_user.did)
180 .fetch_optional(&state.db)
181 .await
182 {
183 Ok(Some(row)) => Json(json!({
184 "handle": row.handle,
185 "did": auth_user.did,
186 "email": row.email,
187 "didDoc": {}
188 })).into_response(),
189 Ok(None) => ApiError::AuthenticationFailed.into_response(),
190 Err(e) => {
191 error!("Database error in get_session: {:?}", e);
192 ApiError::InternalError.into_response()
193 }
194 }
195}
196
197pub async fn delete_session(
198 State(state): State<AppState>,
199 headers: axum::http::HeaderMap,
200) -> Response {
201 let token = match crate::auth::extract_bearer_token_from_header(
202 headers.get("Authorization").and_then(|h| h.to_str().ok())
203 ) {
204 Some(t) => t,
205 None => return ApiError::AuthenticationRequired.into_response(),
206 };
207
208 let jti = match crate::auth::get_jti_from_token(&token) {
209 Ok(jti) => jti,
210 Err(_) => return ApiError::AuthenticationFailed.into_response(),
211 };
212
213 let did = crate::auth::get_did_from_token(&token).ok();
214
215 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
216 .execute(&state.db)
217 .await
218 {
219 Ok(res) if res.rows_affected() > 0 => {
220 if let Some(did) = did {
221 let session_cache_key = format!("auth:session:{}:{}", did, jti);
222 let _ = state.cache.delete(&session_cache_key).await;
223 }
224 Json(json!({})).into_response()
225 }
226 Ok(_) => ApiError::AuthenticationFailed.into_response(),
227 Err(e) => {
228 error!("Database error in delete_session: {:?}", e);
229 ApiError::AuthenticationFailed.into_response()
230 }
231 }
232}
233
234pub async fn refresh_session(
235 State(state): State<AppState>,
236 headers: axum::http::HeaderMap,
237) -> Response {
238 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
239 if !state.check_rate_limit(RateLimitKind::RefreshSession, &client_ip).await {
240 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
241 return (
242 axum::http::StatusCode::TOO_MANY_REQUESTS,
243 axum::Json(serde_json::json!({
244 "error": "RateLimitExceeded",
245 "message": "Too many requests. Please try again later."
246 })),
247 ).into_response();
248 }
249
250 let refresh_token = match crate::auth::extract_bearer_token_from_header(
251 headers.get("Authorization").and_then(|h| h.to_str().ok())
252 ) {
253 Some(t) => t,
254 None => return ApiError::AuthenticationRequired.into_response(),
255 };
256
257 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
258 Ok(jti) => jti,
259 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(),
260 };
261
262 let mut tx = match state.db.begin().await {
263 Ok(tx) => tx,
264 Err(e) => {
265 error!("Failed to begin transaction: {:?}", e);
266 return ApiError::InternalError.into_response();
267 }
268 };
269
270 if let Ok(Some(session_id)) = sqlx::query_scalar!(
271 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
272 refresh_jti
273 )
274 .fetch_optional(&mut *tx)
275 .await
276 {
277 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id);
278 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
279 .execute(&mut *tx)
280 .await;
281 let _ = tx.commit().await;
282 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
283 }
284
285 let session_row = match sqlx::query!(
286 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
287 FROM session_tokens st
288 JOIN users u ON st.did = u.did
289 JOIN user_keys k ON u.id = k.user_id
290 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
291 FOR UPDATE OF st"#,
292 refresh_jti
293 )
294 .fetch_optional(&mut *tx)
295 .await
296 {
297 Ok(Some(row)) => row,
298 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(),
299 Err(e) => {
300 error!("Database error fetching session: {:?}", e);
301 return ApiError::InternalError.into_response();
302 }
303 };
304
305 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
306 Ok(k) => k,
307 Err(e) => {
308 error!("Failed to decrypt user key: {:?}", e);
309 return ApiError::InternalError.into_response();
310 }
311 };
312
313 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
314 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
315 }
316
317 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
318 Ok(m) => m,
319 Err(e) => {
320 error!("Failed to create access token: {:?}", e);
321 return ApiError::InternalError.into_response();
322 }
323 };
324
325 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
326 Ok(m) => m,
327 Err(e) => {
328 error!("Failed to create refresh token: {:?}", e);
329 return ApiError::InternalError.into_response();
330 }
331 };
332
333 match sqlx::query!(
334 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
335 refresh_jti,
336 session_row.id
337 )
338 .execute(&mut *tx)
339 .await
340 {
341 Ok(result) if result.rows_affected() == 0 => {
342 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
343 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
344 .execute(&mut *tx)
345 .await;
346 let _ = tx.commit().await;
347 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
348 }
349 Err(e) => {
350 error!("Failed to record used refresh token: {:?}", e);
351 return ApiError::InternalError.into_response();
352 }
353 Ok(_) => {}
354 }
355
356 if let Err(e) = sqlx::query!(
357 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
358 new_access_meta.jti,
359 new_refresh_meta.jti,
360 new_access_meta.expires_at,
361 new_refresh_meta.expires_at,
362 session_row.id
363 )
364 .execute(&mut *tx)
365 .await
366 {
367 error!("Database error updating session: {:?}", e);
368 return ApiError::InternalError.into_response();
369 }
370
371 if let Err(e) = tx.commit().await {
372 error!("Failed to commit transaction: {:?}", e);
373 return ApiError::InternalError.into_response();
374 }
375
376 match sqlx::query!("SELECT handle FROM users WHERE did = $1", session_row.did)
377 .fetch_optional(&state.db)
378 .await
379 {
380 Ok(Some(u)) => Json(json!({
381 "accessJwt": new_access_meta.token,
382 "refreshJwt": new_refresh_meta.token,
383 "handle": u.handle,
384 "did": session_row.did
385 })).into_response(),
386 Ok(None) => {
387 error!("User not found for existing session: {}", session_row.did);
388 ApiError::InternalError.into_response()
389 }
390 Err(e) => {
391 error!("Database error fetching user: {:?}", e);
392 ApiError::InternalError.into_response()
393 }
394 }
395}
396
397#[derive(Deserialize)]
398#[serde(rename_all = "camelCase")]
399pub struct ConfirmSignupInput {
400 pub did: String,
401 pub verification_code: String,
402}
403
404#[derive(Serialize)]
405#[serde(rename_all = "camelCase")]
406pub struct ConfirmSignupOutput {
407 pub access_jwt: String,
408 pub refresh_jwt: String,
409 pub handle: String,
410 pub did: String,
411}
412
413pub async fn confirm_signup(
414 State(state): State<AppState>,
415 Json(input): Json<ConfirmSignupInput>,
416) -> Response {
417 info!("confirm_signup called for DID: {}", input.did);
418
419 let row = match sqlx::query!(
420 r#"SELECT
421 u.id, u.did, u.handle,
422 u.email_confirmation_code,
423 u.email_confirmation_code_expires_at,
424 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
425 k.key_bytes, k.encryption_version
426 FROM users u
427 JOIN user_keys k ON u.id = k.user_id
428 WHERE u.did = $1"#,
429 input.did
430 )
431 .fetch_optional(&state.db)
432 .await
433 {
434 Ok(Some(row)) => row,
435 Ok(None) => {
436 warn!("User not found for confirm_signup: {}", input.did);
437 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response();
438 }
439 Err(e) => {
440 error!("Database error in confirm_signup: {:?}", e);
441 return ApiError::InternalError.into_response();
442 }
443 };
444
445 let stored_code = match &row.email_confirmation_code {
446 Some(code) => code,
447 None => {
448 warn!("No verification code found for user: {}", input.did);
449 return ApiError::InvalidRequest("No pending verification".into()).into_response();
450 }
451 };
452
453 if stored_code != &input.verification_code {
454 warn!("Invalid verification code for user: {}", input.did);
455 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
456 }
457
458 if let Some(expires_at) = row.email_confirmation_code_expires_at {
459 if expires_at < Utc::now() {
460 warn!("Verification code expired for user: {}", input.did);
461 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response();
462 }
463 }
464
465 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
466 Ok(k) => k,
467 Err(e) => {
468 error!("Failed to decrypt user key: {:?}", e);
469 return ApiError::InternalError.into_response();
470 }
471 };
472
473 let verified_column = match row.channel {
474 crate::notifications::NotificationChannel::Email => "email_confirmed",
475 crate::notifications::NotificationChannel::Discord => "discord_verified",
476 crate::notifications::NotificationChannel::Telegram => "telegram_verified",
477 crate::notifications::NotificationChannel::Signal => "signal_verified",
478 };
479
480 let update_query = format!(
481 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1",
482 verified_column
483 );
484
485 if let Err(e) = sqlx::query(&update_query)
486 .bind(&input.did)
487 .execute(&state.db)
488 .await
489 {
490 error!("Failed to update verification status: {:?}", e);
491 return ApiError::InternalError.into_response();
492 }
493
494 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
495 Ok(m) => m,
496 Err(e) => {
497 error!("Failed to create access token: {:?}", e);
498 return ApiError::InternalError.into_response();
499 }
500 };
501
502 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
503 Ok(m) => m,
504 Err(e) => {
505 error!("Failed to create refresh token: {:?}", e);
506 return ApiError::InternalError.into_response();
507 }
508 };
509
510 if let Err(e) = sqlx::query!(
511 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
512 row.did,
513 access_meta.jti,
514 refresh_meta.jti,
515 access_meta.expires_at,
516 refresh_meta.expires_at
517 )
518 .execute(&state.db)
519 .await
520 {
521 error!("Failed to insert session: {:?}", e);
522 return ApiError::InternalError.into_response();
523 }
524
525 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
526 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await {
527 warn!("Failed to enqueue welcome notification: {:?}", e);
528 }
529
530 Json(ConfirmSignupOutput {
531 access_jwt: access_meta.token,
532 refresh_jwt: refresh_meta.token,
533 handle: row.handle,
534 did: row.did,
535 }).into_response()
536}
537
538#[derive(Deserialize)]
539#[serde(rename_all = "camelCase")]
540pub struct ResendVerificationInput {
541 pub did: String,
542}
543
544pub async fn resend_verification(
545 State(state): State<AppState>,
546 Json(input): Json<ResendVerificationInput>,
547) -> Response {
548 info!("resend_verification called for DID: {}", input.did);
549
550 let row = match sqlx::query!(
551 r#"SELECT
552 id, handle, email,
553 preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
554 discord_id, telegram_username, signal_number,
555 email_confirmed, discord_verified, telegram_verified, signal_verified
556 FROM users
557 WHERE did = $1"#,
558 input.did
559 )
560 .fetch_optional(&state.db)
561 .await
562 {
563 Ok(Some(row)) => row,
564 Ok(None) => {
565 return ApiError::InvalidRequest("User not found".into()).into_response();
566 }
567 Err(e) => {
568 error!("Database error in resend_verification: {:?}", e);
569 return ApiError::InternalError.into_response();
570 }
571 };
572
573 let is_verified = row.email_confirmed
574 || row.discord_verified
575 || row.telegram_verified
576 || row.signal_verified;
577
578 if is_verified {
579 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
580 }
581
582 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
583 let code_expires_at = Utc::now() + chrono::Duration::minutes(30);
584
585 if let Err(e) = sqlx::query!(
586 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3",
587 verification_code,
588 code_expires_at,
589 input.did
590 )
591 .execute(&state.db)
592 .await
593 {
594 error!("Failed to update verification code: {:?}", e);
595 return ApiError::InternalError.into_response();
596 }
597
598 let (channel_str, recipient) = match row.channel {
599 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()),
600 crate::notifications::NotificationChannel::Discord => {
601 ("discord", row.discord_id.unwrap_or_default())
602 }
603 crate::notifications::NotificationChannel::Telegram => {
604 ("telegram", row.telegram_username.unwrap_or_default())
605 }
606 crate::notifications::NotificationChannel::Signal => {
607 ("signal", row.signal_number.unwrap_or_default())
608 }
609 };
610
611 if let Err(e) = crate::notifications::enqueue_signup_verification(
612 &state.db,
613 row.id,
614 channel_str,
615 &recipient,
616 &verification_code,
617 ).await {
618 warn!("Failed to enqueue verification notification: {:?}", e);
619 }
620
621 Json(json!({"success": true})).into_response()
622}