this repo has no description
1use crate::api::ApiError;
2use crate::auth::BearerAuth;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tracing::{error, info, warn};
15
16fn extract_client_ip(headers: &HeaderMap) -> String {
17 if let Some(forwarded) = headers.get("x-forwarded-for") {
18 if let Ok(value) = forwarded.to_str() {
19 if let Some(first_ip) = value.split(',').next() {
20 return first_ip.trim().to_string();
21 }
22 }
23 }
24 if let Some(real_ip) = headers.get("x-real-ip") {
25 if let Ok(value) = real_ip.to_str() {
26 return value.trim().to_string();
27 }
28 }
29 "unknown".to_string()
30}
31
32#[derive(Deserialize)]
33pub struct CreateSessionInput {
34 pub identifier: String,
35 pub password: String,
36}
37
38#[derive(Serialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateSessionOutput {
41 pub access_jwt: String,
42 pub refresh_jwt: String,
43 pub handle: String,
44 pub did: String,
45}
46
47pub async fn create_session(
48 State(state): State<AppState>,
49 headers: HeaderMap,
50 Json(input): Json<CreateSessionInput>,
51) -> Response {
52 info!("create_session called");
53
54 let client_ip = extract_client_ip(&headers);
55 if state.rate_limiters.login.check_key(&client_ip).is_err() {
56 warn!(ip = %client_ip, "Login rate limit exceeded");
57 return (
58 StatusCode::TOO_MANY_REQUESTS,
59 Json(json!({
60 "error": "RateLimitExceeded",
61 "message": "Too many login attempts. Please try again later."
62 })),
63 )
64 .into_response();
65 }
66
67 let row = match sqlx::query!(
68 r#"SELECT
69 u.id, u.did, u.handle, u.password_hash,
70 u.email_confirmed, u.discord_verified, u.telegram_verified, u.signal_verified,
71 k.key_bytes, k.encryption_version
72 FROM users u
73 JOIN user_keys k ON u.id = k.user_id
74 WHERE u.handle = $1 OR u.email = $1"#,
75 input.identifier
76 )
77 .fetch_optional(&state.db)
78 .await
79 {
80 Ok(Some(row)) => row,
81 Ok(None) => {
82 let _ = verify(&input.password, "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYw1ZzQKZqmK");
83 warn!("User not found for login attempt");
84 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
85 }
86 Err(e) => {
87 error!("Database error fetching user: {:?}", e);
88 return ApiError::InternalError.into_response();
89 }
90 };
91
92 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
93 Ok(k) => k,
94 Err(e) => {
95 error!("Failed to decrypt user key: {:?}", e);
96 return ApiError::InternalError.into_response();
97 }
98 };
99
100 let password_valid = verify(&input.password, &row.password_hash).unwrap_or(false)
101 || sqlx::query!("SELECT password_hash FROM app_passwords WHERE user_id = $1", row.id)
102 .fetch_all(&state.db)
103 .await
104 .unwrap_or_default()
105 .iter()
106 .any(|app| verify(&input.password, &app.password_hash).unwrap_or(false));
107
108 if !password_valid {
109 warn!("Password verification failed for login attempt");
110 return ApiError::AuthenticationFailedMsg("Invalid identifier or password".into()).into_response();
111 }
112
113 let is_verified = row.email_confirmed
114 || row.discord_verified
115 || row.telegram_verified
116 || row.signal_verified;
117
118 if !is_verified {
119 warn!("Login attempt for unverified account: {}", row.did);
120 return (
121 StatusCode::FORBIDDEN,
122 Json(json!({
123 "error": "AccountNotVerified",
124 "message": "Please verify your account before logging in",
125 "did": row.did
126 })),
127 ).into_response();
128 }
129
130 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
131 Ok(m) => m,
132 Err(e) => {
133 error!("Failed to create access token: {:?}", e);
134 return ApiError::InternalError.into_response();
135 }
136 };
137
138 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
139 Ok(m) => m,
140 Err(e) => {
141 error!("Failed to create refresh token: {:?}", e);
142 return ApiError::InternalError.into_response();
143 }
144 };
145
146 if let Err(e) = sqlx::query!(
147 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
148 row.did,
149 access_meta.jti,
150 refresh_meta.jti,
151 access_meta.expires_at,
152 refresh_meta.expires_at
153 )
154 .execute(&state.db)
155 .await
156 {
157 error!("Failed to insert session: {:?}", e);
158 return ApiError::InternalError.into_response();
159 }
160
161 Json(CreateSessionOutput {
162 access_jwt: access_meta.token,
163 refresh_jwt: refresh_meta.token,
164 handle: row.handle,
165 did: row.did,
166 }).into_response()
167}
168
169pub async fn get_session(
170 State(state): State<AppState>,
171 BearerAuth(auth_user): BearerAuth,
172) -> Response {
173 match sqlx::query!("SELECT handle, email FROM users WHERE did = $1", auth_user.did)
174 .fetch_optional(&state.db)
175 .await
176 {
177 Ok(Some(row)) => Json(json!({
178 "handle": row.handle,
179 "did": auth_user.did,
180 "email": row.email,
181 "didDoc": {}
182 })).into_response(),
183 Ok(None) => ApiError::AuthenticationFailed.into_response(),
184 Err(e) => {
185 error!("Database error in get_session: {:?}", e);
186 ApiError::InternalError.into_response()
187 }
188 }
189}
190
191pub async fn delete_session(
192 State(state): State<AppState>,
193 headers: axum::http::HeaderMap,
194) -> Response {
195 let token = match crate::auth::extract_bearer_token_from_header(
196 headers.get("Authorization").and_then(|h| h.to_str().ok())
197 ) {
198 Some(t) => t,
199 None => return ApiError::AuthenticationRequired.into_response(),
200 };
201
202 let jti = match crate::auth::get_jti_from_token(&token) {
203 Ok(jti) => jti,
204 Err(_) => return ApiError::AuthenticationFailed.into_response(),
205 };
206
207 match sqlx::query!("DELETE FROM session_tokens WHERE access_jti = $1", jti)
208 .execute(&state.db)
209 .await
210 {
211 Ok(res) if res.rows_affected() > 0 => Json(json!({})).into_response(),
212 Ok(_) => ApiError::AuthenticationFailed.into_response(),
213 Err(e) => {
214 error!("Database error in delete_session: {:?}", e);
215 ApiError::AuthenticationFailed.into_response()
216 }
217 }
218}
219
220pub async fn refresh_session(
221 State(state): State<AppState>,
222 headers: axum::http::HeaderMap,
223) -> Response {
224 let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
225 if !state.distributed_rate_limiter.check_rate_limit(
226 &format!("refresh_session:{}", client_ip),
227 60,
228 60_000,
229 ).await {
230 if state.rate_limiters.refresh_session.check_key(&client_ip).is_err() {
231 tracing::warn!(ip = %client_ip, "Refresh session rate limit exceeded");
232 return (
233 axum::http::StatusCode::TOO_MANY_REQUESTS,
234 axum::Json(serde_json::json!({
235 "error": "RateLimitExceeded",
236 "message": "Too many requests. Please try again later."
237 })),
238 ).into_response();
239 }
240 }
241
242 let refresh_token = match crate::auth::extract_bearer_token_from_header(
243 headers.get("Authorization").and_then(|h| h.to_str().ok())
244 ) {
245 Some(t) => t,
246 None => return ApiError::AuthenticationRequired.into_response(),
247 };
248
249 let refresh_jti = match crate::auth::get_jti_from_token(&refresh_token) {
250 Ok(jti) => jti,
251 Err(_) => return ApiError::AuthenticationFailedMsg("Invalid token format".into()).into_response(),
252 };
253
254 let mut tx = match state.db.begin().await {
255 Ok(tx) => tx,
256 Err(e) => {
257 error!("Failed to begin transaction: {:?}", e);
258 return ApiError::InternalError.into_response();
259 }
260 };
261
262 if let Ok(Some(session_id)) = sqlx::query_scalar!(
263 "SELECT session_id FROM used_refresh_tokens WHERE refresh_jti = $1 FOR UPDATE",
264 refresh_jti
265 )
266 .fetch_optional(&mut *tx)
267 .await
268 {
269 warn!("Refresh token reuse detected! Revoking token family for session_id: {}", session_id);
270 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_id)
271 .execute(&mut *tx)
272 .await;
273 let _ = tx.commit().await;
274 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
275 }
276
277 let session_row = match sqlx::query!(
278 r#"SELECT st.id, st.did, k.key_bytes, k.encryption_version
279 FROM session_tokens st
280 JOIN users u ON st.did = u.did
281 JOIN user_keys k ON u.id = k.user_id
282 WHERE st.refresh_jti = $1 AND st.refresh_expires_at > NOW()
283 FOR UPDATE OF st"#,
284 refresh_jti
285 )
286 .fetch_optional(&mut *tx)
287 .await
288 {
289 Ok(Some(row)) => row,
290 Ok(None) => return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response(),
291 Err(e) => {
292 error!("Database error fetching session: {:?}", e);
293 return ApiError::InternalError.into_response();
294 }
295 };
296
297 let key_bytes = match crate::config::decrypt_key(&session_row.key_bytes, session_row.encryption_version) {
298 Ok(k) => k,
299 Err(e) => {
300 error!("Failed to decrypt user key: {:?}", e);
301 return ApiError::InternalError.into_response();
302 }
303 };
304
305 if crate::auth::verify_refresh_token(&refresh_token, &key_bytes).is_err() {
306 return ApiError::AuthenticationFailedMsg("Invalid refresh token".into()).into_response();
307 }
308
309 let new_access_meta = match crate::auth::create_access_token_with_metadata(&session_row.did, &key_bytes) {
310 Ok(m) => m,
311 Err(e) => {
312 error!("Failed to create access token: {:?}", e);
313 return ApiError::InternalError.into_response();
314 }
315 };
316
317 let new_refresh_meta = match crate::auth::create_refresh_token_with_metadata(&session_row.did, &key_bytes) {
318 Ok(m) => m,
319 Err(e) => {
320 error!("Failed to create refresh token: {:?}", e);
321 return ApiError::InternalError.into_response();
322 }
323 };
324
325 match sqlx::query!(
326 "INSERT INTO used_refresh_tokens (refresh_jti, session_id) VALUES ($1, $2) ON CONFLICT (refresh_jti) DO NOTHING",
327 refresh_jti,
328 session_row.id
329 )
330 .execute(&mut *tx)
331 .await
332 {
333 Ok(result) if result.rows_affected() == 0 => {
334 warn!("Concurrent refresh token reuse detected for session_id: {}", session_row.id);
335 let _ = sqlx::query!("DELETE FROM session_tokens WHERE id = $1", session_row.id)
336 .execute(&mut *tx)
337 .await;
338 let _ = tx.commit().await;
339 return ApiError::ExpiredTokenMsg("Refresh token has been revoked due to suspected compromise".into()).into_response();
340 }
341 Err(e) => {
342 error!("Failed to record used refresh token: {:?}", e);
343 return ApiError::InternalError.into_response();
344 }
345 Ok(_) => {}
346 }
347
348 if let Err(e) = sqlx::query!(
349 "UPDATE session_tokens SET access_jti = $1, refresh_jti = $2, access_expires_at = $3, refresh_expires_at = $4, updated_at = NOW() WHERE id = $5",
350 new_access_meta.jti,
351 new_refresh_meta.jti,
352 new_access_meta.expires_at,
353 new_refresh_meta.expires_at,
354 session_row.id
355 )
356 .execute(&mut *tx)
357 .await
358 {
359 error!("Database error updating session: {:?}", e);
360 return ApiError::InternalError.into_response();
361 }
362
363 if let Err(e) = tx.commit().await {
364 error!("Failed to commit transaction: {:?}", e);
365 return ApiError::InternalError.into_response();
366 }
367
368 match sqlx::query!("SELECT handle FROM users WHERE did = $1", session_row.did)
369 .fetch_optional(&state.db)
370 .await
371 {
372 Ok(Some(u)) => Json(json!({
373 "accessJwt": new_access_meta.token,
374 "refreshJwt": new_refresh_meta.token,
375 "handle": u.handle,
376 "did": session_row.did
377 })).into_response(),
378 Ok(None) => {
379 error!("User not found for existing session: {}", session_row.did);
380 ApiError::InternalError.into_response()
381 }
382 Err(e) => {
383 error!("Database error fetching user: {:?}", e);
384 ApiError::InternalError.into_response()
385 }
386 }
387}
388
389#[derive(Deserialize)]
390#[serde(rename_all = "camelCase")]
391pub struct ConfirmSignupInput {
392 pub did: String,
393 pub verification_code: String,
394}
395
396#[derive(Serialize)]
397#[serde(rename_all = "camelCase")]
398pub struct ConfirmSignupOutput {
399 pub access_jwt: String,
400 pub refresh_jwt: String,
401 pub handle: String,
402 pub did: String,
403}
404
405pub async fn confirm_signup(
406 State(state): State<AppState>,
407 Json(input): Json<ConfirmSignupInput>,
408) -> Response {
409 info!("confirm_signup called for DID: {}", input.did);
410
411 let row = match sqlx::query!(
412 r#"SELECT
413 u.id, u.did, u.handle,
414 u.email_confirmation_code,
415 u.email_confirmation_code_expires_at,
416 u.preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
417 k.key_bytes, k.encryption_version
418 FROM users u
419 JOIN user_keys k ON u.id = k.user_id
420 WHERE u.did = $1"#,
421 input.did
422 )
423 .fetch_optional(&state.db)
424 .await
425 {
426 Ok(Some(row)) => row,
427 Ok(None) => {
428 warn!("User not found for confirm_signup: {}", input.did);
429 return ApiError::InvalidRequest("Invalid DID or verification code".into()).into_response();
430 }
431 Err(e) => {
432 error!("Database error in confirm_signup: {:?}", e);
433 return ApiError::InternalError.into_response();
434 }
435 };
436
437 let stored_code = match &row.email_confirmation_code {
438 Some(code) => code,
439 None => {
440 warn!("No verification code found for user: {}", input.did);
441 return ApiError::InvalidRequest("No pending verification".into()).into_response();
442 }
443 };
444
445 if stored_code != &input.verification_code {
446 warn!("Invalid verification code for user: {}", input.did);
447 return ApiError::InvalidRequest("Invalid verification code".into()).into_response();
448 }
449
450 if let Some(expires_at) = row.email_confirmation_code_expires_at {
451 if expires_at < Utc::now() {
452 warn!("Verification code expired for user: {}", input.did);
453 return ApiError::ExpiredTokenMsg("Verification code has expired".into()).into_response();
454 }
455 }
456
457 let key_bytes = match crate::config::decrypt_key(&row.key_bytes, row.encryption_version) {
458 Ok(k) => k,
459 Err(e) => {
460 error!("Failed to decrypt user key: {:?}", e);
461 return ApiError::InternalError.into_response();
462 }
463 };
464
465 let verified_column = match row.channel {
466 crate::notifications::NotificationChannel::Email => "email_confirmed",
467 crate::notifications::NotificationChannel::Discord => "discord_verified",
468 crate::notifications::NotificationChannel::Telegram => "telegram_verified",
469 crate::notifications::NotificationChannel::Signal => "signal_verified",
470 };
471
472 let update_query = format!(
473 "UPDATE users SET {} = TRUE, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE did = $1",
474 verified_column
475 );
476
477 if let Err(e) = sqlx::query(&update_query)
478 .bind(&input.did)
479 .execute(&state.db)
480 .await
481 {
482 error!("Failed to update verification status: {:?}", e);
483 return ApiError::InternalError.into_response();
484 }
485
486 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) {
487 Ok(m) => m,
488 Err(e) => {
489 error!("Failed to create access token: {:?}", e);
490 return ApiError::InternalError.into_response();
491 }
492 };
493
494 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&row.did, &key_bytes) {
495 Ok(m) => m,
496 Err(e) => {
497 error!("Failed to create refresh token: {:?}", e);
498 return ApiError::InternalError.into_response();
499 }
500 };
501
502 if let Err(e) = sqlx::query!(
503 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
504 row.did,
505 access_meta.jti,
506 refresh_meta.jti,
507 access_meta.expires_at,
508 refresh_meta.expires_at
509 )
510 .execute(&state.db)
511 .await
512 {
513 error!("Failed to insert session: {:?}", e);
514 return ApiError::InternalError.into_response();
515 }
516
517 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
518 if let Err(e) = crate::notifications::enqueue_welcome(&state.db, row.id, &hostname).await {
519 warn!("Failed to enqueue welcome notification: {:?}", e);
520 }
521
522 Json(ConfirmSignupOutput {
523 access_jwt: access_meta.token,
524 refresh_jwt: refresh_meta.token,
525 handle: row.handle,
526 did: row.did,
527 }).into_response()
528}
529
530#[derive(Deserialize)]
531#[serde(rename_all = "camelCase")]
532pub struct ResendVerificationInput {
533 pub did: String,
534}
535
536pub async fn resend_verification(
537 State(state): State<AppState>,
538 Json(input): Json<ResendVerificationInput>,
539) -> Response {
540 info!("resend_verification called for DID: {}", input.did);
541
542 let row = match sqlx::query!(
543 r#"SELECT
544 id, handle, email,
545 preferred_notification_channel as "channel: crate::notifications::NotificationChannel",
546 discord_id, telegram_username, signal_number,
547 email_confirmed, discord_verified, telegram_verified, signal_verified
548 FROM users
549 WHERE did = $1"#,
550 input.did
551 )
552 .fetch_optional(&state.db)
553 .await
554 {
555 Ok(Some(row)) => row,
556 Ok(None) => {
557 return ApiError::InvalidRequest("User not found".into()).into_response();
558 }
559 Err(e) => {
560 error!("Database error in resend_verification: {:?}", e);
561 return ApiError::InternalError.into_response();
562 }
563 };
564
565 let is_verified = row.email_confirmed
566 || row.discord_verified
567 || row.telegram_verified
568 || row.signal_verified;
569
570 if is_verified {
571 return ApiError::InvalidRequest("Account is already verified".into()).into_response();
572 }
573
574 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
575 let code_expires_at = Utc::now() + chrono::Duration::minutes(30);
576
577 if let Err(e) = sqlx::query!(
578 "UPDATE users SET email_confirmation_code = $1, email_confirmation_code_expires_at = $2 WHERE did = $3",
579 verification_code,
580 code_expires_at,
581 input.did
582 )
583 .execute(&state.db)
584 .await
585 {
586 error!("Failed to update verification code: {:?}", e);
587 return ApiError::InternalError.into_response();
588 }
589
590 let (channel_str, recipient) = match row.channel {
591 crate::notifications::NotificationChannel::Email => ("email", row.email.clone().unwrap_or_default()),
592 crate::notifications::NotificationChannel::Discord => {
593 ("discord", row.discord_id.unwrap_or_default())
594 }
595 crate::notifications::NotificationChannel::Telegram => {
596 ("telegram", row.telegram_username.unwrap_or_default())
597 }
598 crate::notifications::NotificationChannel::Signal => {
599 ("signal", row.signal_number.unwrap_or_default())
600 }
601 };
602
603 if let Err(e) = crate::notifications::enqueue_signup_verification(
604 &state.db,
605 row.id,
606 channel_str,
607 &recipient,
608 &verification_code,
609 ).await {
610 warn!("Failed to enqueue verification notification: {:?}", e);
611 }
612
613 Json(json!({"success": true})).into_response()
614}