this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::State,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use bcrypt::verify;
9use chrono::{Duration, Utc};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use tracing::{error, info, warn};
13use uuid::Uuid;
14
15#[derive(Serialize)]
16#[serde(rename_all = "camelCase")]
17pub struct CheckAccountStatusOutput {
18 pub activated: bool,
19 pub valid_did: bool,
20 pub repo_commit: String,
21 pub repo_rev: String,
22 pub repo_blocks: i64,
23 pub indexed_records: i64,
24 pub private_state_values: i64,
25 pub expected_blobs: i64,
26 pub imported_blobs: i64,
27}
28
29pub async fn check_account_status(
30 State(state): State<AppState>,
31 headers: axum::http::HeaderMap,
32) -> Response {
33 let auth_header = headers.get("Authorization");
34 if auth_header.is_none() {
35 return (
36 StatusCode::UNAUTHORIZED,
37 Json(json!({"error": "AuthenticationRequired"})),
38 )
39 .into_response();
40 }
41
42 let token = auth_header
43 .unwrap()
44 .to_str()
45 .unwrap_or("")
46 .replace("Bearer ", "");
47
48 let session = sqlx::query!(
49 r#"
50 SELECT s.did, k.key_bytes, u.id as user_id
51 FROM sessions s
52 JOIN users u ON s.did = u.did
53 JOIN user_keys k ON u.id = k.user_id
54 WHERE s.access_jwt = $1
55 "#,
56 token
57 )
58 .fetch_optional(&state.db)
59 .await;
60
61 let (did, key_bytes, user_id) = match session {
62 Ok(Some(row)) => (row.did, row.key_bytes, row.user_id),
63 Ok(None) => {
64 return (
65 StatusCode::UNAUTHORIZED,
66 Json(json!({"error": "AuthenticationFailed"})),
67 )
68 .into_response();
69 }
70 Err(e) => {
71 error!("DB error in check_account_status: {:?}", e);
72 return (
73 StatusCode::INTERNAL_SERVER_ERROR,
74 Json(json!({"error": "InternalError"})),
75 )
76 .into_response();
77 }
78 };
79
80 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
81 return (
82 StatusCode::UNAUTHORIZED,
83 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
84 )
85 .into_response();
86 }
87
88 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
89 .fetch_optional(&state.db)
90 .await;
91
92 let deactivated_at = match user_status {
93 Ok(Some(row)) => row.deactivated_at,
94 _ => None,
95 };
96
97 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
98 .fetch_optional(&state.db)
99 .await;
100
101 let repo_commit = match repo_result {
102 Ok(Some(row)) => row.repo_root_cid,
103 _ => String::new(),
104 };
105
106 let record_count: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
107 .fetch_one(&state.db)
108 .await
109 .unwrap_or(Some(0))
110 .unwrap_or(0);
111
112 let blob_count: i64 =
113 sqlx::query_scalar!("SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", user_id)
114 .fetch_one(&state.db)
115 .await
116 .unwrap_or(Some(0))
117 .unwrap_or(0);
118
119 let valid_did = did.starts_with("did:");
120
121 (
122 StatusCode::OK,
123 Json(CheckAccountStatusOutput {
124 activated: deactivated_at.is_none(),
125 valid_did,
126 repo_commit: repo_commit.clone(),
127 repo_rev: chrono::Utc::now().timestamp_millis().to_string(),
128 repo_blocks: 0,
129 indexed_records: record_count,
130 private_state_values: 0,
131 expected_blobs: blob_count,
132 imported_blobs: blob_count,
133 }),
134 )
135 .into_response()
136}
137
138pub async fn activate_account(
139 State(state): State<AppState>,
140 headers: axum::http::HeaderMap,
141) -> Response {
142 let auth_header = headers.get("Authorization");
143 if auth_header.is_none() {
144 return (
145 StatusCode::UNAUTHORIZED,
146 Json(json!({"error": "AuthenticationRequired"})),
147 )
148 .into_response();
149 }
150
151 let token = auth_header
152 .unwrap()
153 .to_str()
154 .unwrap_or("")
155 .replace("Bearer ", "");
156
157 let session = sqlx::query!(
158 r#"
159 SELECT s.did, k.key_bytes
160 FROM sessions s
161 JOIN users u ON s.did = u.did
162 JOIN user_keys k ON u.id = k.user_id
163 WHERE s.access_jwt = $1
164 "#,
165 token
166 )
167 .fetch_optional(&state.db)
168 .await;
169
170 let (did, key_bytes) = match session {
171 Ok(Some(row)) => (row.did, row.key_bytes),
172 Ok(None) => {
173 return (
174 StatusCode::UNAUTHORIZED,
175 Json(json!({"error": "AuthenticationFailed"})),
176 )
177 .into_response();
178 }
179 Err(e) => {
180 error!("DB error in activate_account: {:?}", e);
181 return (
182 StatusCode::INTERNAL_SERVER_ERROR,
183 Json(json!({"error": "InternalError"})),
184 )
185 .into_response();
186 }
187 };
188
189 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
190 return (
191 StatusCode::UNAUTHORIZED,
192 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
193 )
194 .into_response();
195 }
196
197 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
198 .execute(&state.db)
199 .await;
200
201 match result {
202 Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(),
203 Err(e) => {
204 error!("DB error activating account: {:?}", e);
205 (
206 StatusCode::INTERNAL_SERVER_ERROR,
207 Json(json!({"error": "InternalError"})),
208 )
209 .into_response()
210 }
211 }
212}
213
214#[derive(Deserialize)]
215#[serde(rename_all = "camelCase")]
216pub struct DeactivateAccountInput {
217 pub delete_after: Option<String>,
218}
219
220pub async fn deactivate_account(
221 State(state): State<AppState>,
222 headers: axum::http::HeaderMap,
223 Json(_input): Json<DeactivateAccountInput>,
224) -> Response {
225 let auth_header = headers.get("Authorization");
226 if auth_header.is_none() {
227 return (
228 StatusCode::UNAUTHORIZED,
229 Json(json!({"error": "AuthenticationRequired"})),
230 )
231 .into_response();
232 }
233
234 let token = auth_header
235 .unwrap()
236 .to_str()
237 .unwrap_or("")
238 .replace("Bearer ", "");
239
240 let session = sqlx::query!(
241 r#"
242 SELECT s.did, k.key_bytes
243 FROM sessions s
244 JOIN users u ON s.did = u.did
245 JOIN user_keys k ON u.id = k.user_id
246 WHERE s.access_jwt = $1
247 "#,
248 token
249 )
250 .fetch_optional(&state.db)
251 .await;
252
253 let (did, key_bytes) = match session {
254 Ok(Some(row)) => (row.did, row.key_bytes),
255 Ok(None) => {
256 return (
257 StatusCode::UNAUTHORIZED,
258 Json(json!({"error": "AuthenticationFailed"})),
259 )
260 .into_response();
261 }
262 Err(e) => {
263 error!("DB error in deactivate_account: {:?}", e);
264 return (
265 StatusCode::INTERNAL_SERVER_ERROR,
266 Json(json!({"error": "InternalError"})),
267 )
268 .into_response();
269 }
270 };
271
272 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
273 return (
274 StatusCode::UNAUTHORIZED,
275 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
276 )
277 .into_response();
278 }
279
280 let result = sqlx::query!("UPDATE users SET deactivated_at = NOW() WHERE did = $1", did)
281 .execute(&state.db)
282 .await;
283
284 match result {
285 Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(),
286 Err(e) => {
287 error!("DB error deactivating account: {:?}", e);
288 (
289 StatusCode::INTERNAL_SERVER_ERROR,
290 Json(json!({"error": "InternalError"})),
291 )
292 .into_response()
293 }
294 }
295}
296
297pub async fn request_account_delete(
298 State(state): State<AppState>,
299 headers: axum::http::HeaderMap,
300) -> Response {
301 let auth_header = headers.get("Authorization");
302 if auth_header.is_none() {
303 return (
304 StatusCode::UNAUTHORIZED,
305 Json(json!({"error": "AuthenticationRequired"})),
306 )
307 .into_response();
308 }
309
310 let token = auth_header
311 .unwrap()
312 .to_str()
313 .unwrap_or("")
314 .replace("Bearer ", "");
315
316 let session = sqlx::query!(
317 r#"
318 SELECT s.did, u.id as user_id, u.email, u.handle, k.key_bytes
319 FROM sessions s
320 JOIN users u ON s.did = u.did
321 JOIN user_keys k ON u.id = k.user_id
322 WHERE s.access_jwt = $1
323 "#,
324 token
325 )
326 .fetch_optional(&state.db)
327 .await;
328
329 let (did, user_id, email, handle, key_bytes) = match session {
330 Ok(Some(row)) => (row.did, row.user_id, row.email, row.handle, row.key_bytes),
331 Ok(None) => {
332 return (
333 StatusCode::UNAUTHORIZED,
334 Json(json!({"error": "AuthenticationFailed"})),
335 )
336 .into_response();
337 }
338 Err(e) => {
339 error!("DB error in request_account_delete: {:?}", e);
340 return (
341 StatusCode::INTERNAL_SERVER_ERROR,
342 Json(json!({"error": "InternalError"})),
343 )
344 .into_response();
345 }
346 };
347
348 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
349 return (
350 StatusCode::UNAUTHORIZED,
351 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
352 )
353 .into_response();
354 }
355
356 let confirmation_token = Uuid::new_v4().to_string();
357 let expires_at = Utc::now() + Duration::minutes(15);
358
359 let insert = sqlx::query!(
360 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
361 confirmation_token,
362 did,
363 expires_at
364 )
365 .execute(&state.db)
366 .await;
367
368 if let Err(e) = insert {
369 error!("DB error creating deletion token: {:?}", e);
370 return (
371 StatusCode::INTERNAL_SERVER_ERROR,
372 Json(json!({"error": "InternalError"})),
373 )
374 .into_response();
375 }
376
377 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
378 if let Err(e) = crate::notifications::enqueue_account_deletion(
379 &state.db,
380 user_id,
381 &email,
382 &handle,
383 &confirmation_token,
384 &hostname,
385 )
386 .await
387 {
388 warn!("Failed to enqueue account deletion notification: {:?}", e);
389 }
390
391 info!("Account deletion requested for user {}", did);
392
393 (StatusCode::OK, Json(json!({}))).into_response()
394}
395
396#[derive(Deserialize)]
397pub struct DeleteAccountInput {
398 pub did: String,
399 pub password: String,
400 pub token: String,
401}
402
403pub async fn delete_account(
404 State(state): State<AppState>,
405 Json(input): Json<DeleteAccountInput>,
406) -> Response {
407 let did = input.did.trim();
408 let password = &input.password;
409 let token = input.token.trim();
410
411 if did.is_empty() {
412 return (
413 StatusCode::BAD_REQUEST,
414 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
415 )
416 .into_response();
417 }
418
419 if password.is_empty() {
420 return (
421 StatusCode::BAD_REQUEST,
422 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
423 )
424 .into_response();
425 }
426
427 if token.is_empty() {
428 return (
429 StatusCode::BAD_REQUEST,
430 Json(json!({"error": "InvalidToken", "message": "token is required"})),
431 )
432 .into_response();
433 }
434
435 let user = sqlx::query!(
436 "SELECT id, password_hash FROM users WHERE did = $1",
437 did
438 )
439 .fetch_optional(&state.db)
440 .await;
441
442 let (user_id, password_hash) = match user {
443 Ok(Some(row)) => (row.id, row.password_hash),
444 Ok(None) => {
445 return (
446 StatusCode::BAD_REQUEST,
447 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
448 )
449 .into_response();
450 }
451 Err(e) => {
452 error!("DB error in delete_account: {:?}", e);
453 return (
454 StatusCode::INTERNAL_SERVER_ERROR,
455 Json(json!({"error": "InternalError"})),
456 )
457 .into_response();
458 }
459 };
460
461 let password_valid = if verify(password, &password_hash).unwrap_or(false) {
462 true
463 } else {
464 let app_pass_rows = sqlx::query!(
465 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
466 user_id
467 )
468 .fetch_all(&state.db)
469 .await
470 .unwrap_or_default();
471
472 app_pass_rows
473 .iter()
474 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
475 };
476
477 if !password_valid {
478 return (
479 StatusCode::UNAUTHORIZED,
480 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
481 )
482 .into_response();
483 }
484
485 let deletion_request = sqlx::query!(
486 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
487 token
488 )
489 .fetch_optional(&state.db)
490 .await;
491
492 let (token_did, expires_at) = match deletion_request {
493 Ok(Some(row)) => (row.did, row.expires_at),
494 Ok(None) => {
495 return (
496 StatusCode::BAD_REQUEST,
497 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
498 )
499 .into_response();
500 }
501 Err(e) => {
502 error!("DB error fetching deletion token: {:?}", e);
503 return (
504 StatusCode::INTERNAL_SERVER_ERROR,
505 Json(json!({"error": "InternalError"})),
506 )
507 .into_response();
508 }
509 };
510
511 if token_did != did {
512 return (
513 StatusCode::BAD_REQUEST,
514 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
515 )
516 .into_response();
517 }
518
519 if Utc::now() > expires_at {
520 let _ = sqlx::query!("DELETE FROM account_deletion_requests WHERE token = $1", token)
521 .execute(&state.db)
522 .await;
523
524 return (
525 StatusCode::BAD_REQUEST,
526 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
527 )
528 .into_response();
529 }
530
531 let mut tx = match state.db.begin().await {
532 Ok(tx) => tx,
533 Err(e) => {
534 error!("Failed to begin transaction: {:?}", e);
535 return (
536 StatusCode::INTERNAL_SERVER_ERROR,
537 Json(json!({"error": "InternalError"})),
538 )
539 .into_response();
540 }
541 };
542
543 let deletion_result: Result<(), sqlx::Error> = async {
544 sqlx::query!("DELETE FROM sessions WHERE did = $1", did)
545 .execute(&mut *tx)
546 .await?;
547
548 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
549 .execute(&mut *tx)
550 .await?;
551
552 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
553 .execute(&mut *tx)
554 .await?;
555
556 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
557 .execute(&mut *tx)
558 .await?;
559
560 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
561 .execute(&mut *tx)
562 .await?;
563
564 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
565 .execute(&mut *tx)
566 .await?;
567
568 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
569 .execute(&mut *tx)
570 .await?;
571
572 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
573 .execute(&mut *tx)
574 .await?;
575
576 Ok(())
577 }
578 .await;
579
580 match deletion_result {
581 Ok(()) => {
582 if let Err(e) = tx.commit().await {
583 error!("Failed to commit account deletion transaction: {:?}", e);
584 return (
585 StatusCode::INTERNAL_SERVER_ERROR,
586 Json(json!({"error": "InternalError"})),
587 )
588 .into_response();
589 }
590 info!("Account {} deleted successfully", did);
591 (StatusCode::OK, Json(json!({}))).into_response()
592 }
593 Err(e) => {
594 error!("DB error deleting account, rolling back: {:?}", e);
595 (
596 StatusCode::INTERNAL_SERVER_ERROR,
597 Json(json!({"error": "InternalError"})),
598 )
599 .into_response()
600 }
601 }
602}