this repo has no description
1use crate::api::ApiError;
2use crate::plc::PlcClient;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::{Duration, Utc};
12use cid::Cid;
13use jacquard_repo::commit::Commit;
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::str::FromStr;
19use tracing::{error, info, warn};
20use uuid::Uuid;
21
22#[derive(Serialize)]
23#[serde(rename_all = "camelCase")]
24pub struct CheckAccountStatusOutput {
25 pub activated: bool,
26 pub valid_did: bool,
27 pub repo_commit: String,
28 pub repo_rev: String,
29 pub repo_blocks: i64,
30 pub indexed_records: i64,
31 pub private_state_values: i64,
32 pub expected_blobs: i64,
33 pub imported_blobs: i64,
34}
35
36pub async fn check_account_status(
37 State(state): State<AppState>,
38 headers: axum::http::HeaderMap,
39) -> Response {
40 let extracted = match crate::auth::extract_auth_token_from_header(
41 headers.get("Authorization").and_then(|h| h.to_str().ok()),
42 ) {
43 Some(t) => t,
44 None => return ApiError::AuthenticationRequired.into_response(),
45 };
46 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
47 let http_uri = format!(
48 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
49 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
50 );
51 let did = match crate::auth::validate_token_with_dpop(
52 &state.db,
53 &extracted.token,
54 extracted.is_dpop,
55 dpop_proof,
56 "GET",
57 &http_uri,
58 true,
59 )
60 .await
61 {
62 Ok(user) => user.did,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
66 .fetch_optional(&state.db)
67 .await
68 {
69 Ok(Some(id)) => id,
70 _ => {
71 return (
72 StatusCode::INTERNAL_SERVER_ERROR,
73 Json(json!({"error": "InternalError"})),
74 )
75 .into_response();
76 }
77 };
78 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
79 .fetch_optional(&state.db)
80 .await;
81 let deactivated_at = match user_status {
82 Ok(Some(row)) => row.deactivated_at,
83 _ => None,
84 };
85 let repo_result = sqlx::query!(
86 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
87 user_id
88 )
89 .fetch_optional(&state.db)
90 .await;
91 let (repo_commit, repo_rev_from_db) = match repo_result {
92 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
93 _ => (String::new(), None),
94 };
95 let block_count: i64 =
96 sqlx::query_scalar!("SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", user_id)
97 .fetch_one(&state.db)
98 .await
99 .unwrap_or(Some(0))
100 .unwrap_or(0);
101 let repo_rev = if let Some(rev) = repo_rev_from_db {
102 rev
103 } else if !repo_commit.is_empty() {
104 if let Ok(cid) = Cid::from_str(&repo_commit) {
105 if let Ok(Some(block)) = state.block_store.get(&cid).await {
106 Commit::from_cbor(&block)
107 .ok()
108 .map(|c| c.rev().to_string())
109 .unwrap_or_default()
110 } else {
111 String::new()
112 }
113 } else {
114 String::new()
115 }
116 } else {
117 String::new()
118 };
119 let record_count: i64 =
120 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
121 .fetch_one(&state.db)
122 .await
123 .unwrap_or(Some(0))
124 .unwrap_or(0);
125 let imported_blobs: i64 = sqlx::query_scalar!(
126 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
127 user_id
128 )
129 .fetch_one(&state.db)
130 .await
131 .unwrap_or(Some(0))
132 .unwrap_or(0);
133 let expected_blobs: i64 = sqlx::query_scalar!(
134 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
135 user_id
136 )
137 .fetch_one(&state.db)
138 .await
139 .unwrap_or(Some(0))
140 .unwrap_or(0);
141 let valid_did = is_valid_did_for_service(&state.db, &did).await;
142 (
143 StatusCode::OK,
144 Json(CheckAccountStatusOutput {
145 activated: deactivated_at.is_none(),
146 valid_did,
147 repo_commit: repo_commit.clone(),
148 repo_rev,
149 repo_blocks: block_count as i64,
150 indexed_records: record_count,
151 private_state_values: 0,
152 expected_blobs,
153 imported_blobs,
154 }),
155 )
156 .into_response()
157}
158
159async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool {
160 assert_valid_did_document_for_service(db, did, false)
161 .await
162 .is_ok()
163}
164
165async fn assert_valid_did_document_for_service(
166 db: &sqlx::PgPool,
167 did: &str,
168 with_retry: bool,
169) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
170 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
171 let expected_endpoint = format!("https://{}", hostname);
172
173 if did.starts_with("did:plc:") {
174 let plc_client = PlcClient::new(None);
175
176 let max_attempts = if with_retry { 5 } else { 1 };
177 let mut last_error = None;
178 let mut doc_data = None;
179 for attempt in 0..max_attempts {
180 if attempt > 0 {
181 let delay_ms = 500 * (1 << (attempt - 1));
182 info!(
183 "Waiting {}ms before retry {} for DID document validation ({})",
184 delay_ms, attempt, did
185 );
186 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
187 }
188
189 match plc_client.get_document_data(did).await {
190 Ok(data) => {
191 let pds_endpoint = data
192 .get("services")
193 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
194 .and_then(|p| p.get("endpoint"))
195 .and_then(|e| e.as_str());
196
197 if pds_endpoint == Some(&expected_endpoint) {
198 doc_data = Some(data);
199 break;
200 } else {
201 info!(
202 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying",
203 attempt + 1,
204 did,
205 pds_endpoint,
206 expected_endpoint
207 );
208 last_error = Some(format!(
209 "DID document endpoint {:?} does not match expected {}",
210 pds_endpoint, expected_endpoint
211 ));
212 }
213 }
214 Err(e) => {
215 warn!(
216 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
217 attempt + 1,
218 did,
219 e
220 );
221 last_error = Some(format!("Could not resolve DID document: {}", e));
222 }
223 }
224 }
225
226 let doc_data = match doc_data {
227 Some(d) => d,
228 None => {
229 return Err((
230 StatusCode::BAD_REQUEST,
231 Json(json!({
232 "error": "InvalidRequest",
233 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string())
234 })),
235 ));
236 }
237 };
238
239 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
240 if let Some(ref expected_rotation_key) = server_rotation_key {
241 let rotation_keys = doc_data
242 .get("rotationKeys")
243 .and_then(|v| v.as_array())
244 .map(|arr| {
245 arr.iter()
246 .filter_map(|k| k.as_str())
247 .collect::<Vec<_>>()
248 })
249 .unwrap_or_default();
250 if !rotation_keys.contains(&expected_rotation_key.as_str()) {
251 return Err((
252 StatusCode::BAD_REQUEST,
253 Json(json!({
254 "error": "InvalidRequest",
255 "message": "Server rotation key not included in PLC DID data"
256 })),
257 ));
258 }
259 }
260
261 let doc_signing_key = doc_data
262 .get("verificationMethods")
263 .and_then(|v| v.get("atproto"))
264 .and_then(|k| k.as_str());
265
266 let user_row = sqlx::query!(
267 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
268 did
269 )
270 .fetch_optional(db)
271 .await
272 .map_err(|e| {
273 error!("Failed to fetch user key: {:?}", e);
274 (
275 StatusCode::INTERNAL_SERVER_ERROR,
276 Json(json!({"error": "InternalError"})),
277 )
278 })?;
279
280 if let Some(row) = user_row {
281 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
282 .map_err(|e| {
283 error!("Failed to decrypt user key: {}", e);
284 (
285 StatusCode::INTERNAL_SERVER_ERROR,
286 Json(json!({"error": "InternalError"})),
287 )
288 })?;
289 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
290 error!("Failed to create signing key: {:?}", e);
291 (
292 StatusCode::INTERNAL_SERVER_ERROR,
293 Json(json!({"error": "InternalError"})),
294 )
295 })?;
296 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
297
298 if doc_signing_key != Some(&expected_did_key) {
299 warn!(
300 "DID {} has signing key {:?}, expected {}",
301 did, doc_signing_key, expected_did_key
302 );
303 return Err((
304 StatusCode::BAD_REQUEST,
305 Json(json!({
306 "error": "InvalidRequest",
307 "message": "DID document verification method does not match expected signing key"
308 })),
309 ));
310 }
311 }
312 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
313 let client = reqwest::Client::new();
314 let decoded = host_and_path.replace("%3A", ":");
315 let parts: Vec<&str> = decoded.split(':').collect();
316 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
317 {
318 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
319 } else {
320 (parts[0].to_string(), parts[1..].to_vec())
321 };
322 let scheme =
323 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
324 "http"
325 } else {
326 "https"
327 };
328 let url = if path_parts.is_empty() {
329 format!("{}://{}/.well-known/did.json", scheme, host)
330 } else {
331 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
332 };
333 let resp = client.get(&url).send().await.map_err(|e| {
334 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
335 (
336 StatusCode::BAD_REQUEST,
337 Json(json!({
338 "error": "InvalidRequest",
339 "message": format!("Could not resolve DID document: {}", e)
340 })),
341 )
342 })?;
343 let doc: serde_json::Value = resp.json().await.map_err(|e| {
344 warn!("Failed to parse did:web document for {}: {:?}", did, e);
345 (
346 StatusCode::BAD_REQUEST,
347 Json(json!({
348 "error": "InvalidRequest",
349 "message": format!("Could not parse DID document: {}", e)
350 })),
351 )
352 })?;
353
354 let pds_endpoint = doc
355 .get("service")
356 .and_then(|s| s.as_array())
357 .and_then(|arr| {
358 arr.iter().find(|svc| {
359 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
360 || svc.get("type").and_then(|t| t.as_str())
361 == Some("AtprotoPersonalDataServer")
362 })
363 })
364 .and_then(|svc| svc.get("serviceEndpoint"))
365 .and_then(|e| e.as_str());
366
367 if pds_endpoint != Some(&expected_endpoint) {
368 warn!(
369 "DID {} has endpoint {:?}, expected {}",
370 did, pds_endpoint, expected_endpoint
371 );
372 return Err((
373 StatusCode::BAD_REQUEST,
374 Json(json!({
375 "error": "InvalidRequest",
376 "message": "DID document atproto_pds service endpoint does not match PDS public url"
377 })),
378 ));
379 }
380 }
381
382 Ok(())
383}
384
385pub async fn activate_account(
386 State(state): State<AppState>,
387 headers: axum::http::HeaderMap,
388) -> Response {
389 info!("[MIGRATION] activateAccount called");
390 let extracted = match crate::auth::extract_auth_token_from_header(
391 headers.get("Authorization").and_then(|h| h.to_str().ok()),
392 ) {
393 Some(t) => t,
394 None => {
395 info!("[MIGRATION] activateAccount: No auth token");
396 return ApiError::AuthenticationRequired.into_response();
397 }
398 };
399 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
400 let http_uri = format!(
401 "https://{}/xrpc/com.atproto.server.activateAccount",
402 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
403 );
404 let auth_user = match crate::auth::validate_token_with_dpop(
405 &state.db,
406 &extracted.token,
407 extracted.is_dpop,
408 dpop_proof,
409 "POST",
410 &http_uri,
411 true,
412 )
413 .await
414 {
415 Ok(user) => user,
416 Err(e) => {
417 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
418 return ApiError::from(e).into_response();
419 }
420 };
421 info!(
422 "[MIGRATION] activateAccount: Authenticated user did={}",
423 auth_user.did
424 );
425
426 if let Err(e) = crate::auth::scope_check::check_account_scope(
427 auth_user.is_oauth,
428 auth_user.scope.as_deref(),
429 crate::oauth::scopes::AccountAttr::Repo,
430 crate::oauth::scopes::AccountAction::Manage,
431 ) {
432 info!("[MIGRATION] activateAccount: Scope check failed");
433 return e;
434 }
435
436 let did = auth_user.did;
437
438 info!(
439 "[MIGRATION] activateAccount: Validating DID document for did={}",
440 did
441 );
442 let did_validation_start = std::time::Instant::now();
443 if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await {
444 info!(
445 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
446 did,
447 did_validation_start.elapsed()
448 );
449 return (status, json).into_response();
450 }
451 info!(
452 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
453 did,
454 did_validation_start.elapsed()
455 );
456
457 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
458 .fetch_optional(&state.db)
459 .await
460 .ok()
461 .flatten();
462 info!(
463 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
464 did, handle
465 );
466 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
467 .execute(&state.db)
468 .await;
469 match result {
470 Ok(_) => {
471 info!(
472 "[MIGRATION] activateAccount: DB update success for did={}",
473 did
474 );
475 if let Some(ref h) = handle {
476 let _ = state.cache.delete(&format!("handle:{}", h)).await;
477 }
478 info!(
479 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
480 did
481 );
482 if let Err(e) =
483 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
484 {
485 warn!(
486 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
487 e
488 );
489 } else {
490 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
491 }
492 info!(
493 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
494 did, handle
495 );
496 if let Err(e) =
497 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref())
498 .await
499 {
500 warn!(
501 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
502 e
503 );
504 } else {
505 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
506 }
507 let repo_root = sqlx::query_scalar!(
508 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
509 did
510 )
511 .fetch_optional(&state.db)
512 .await
513 .ok()
514 .flatten();
515 if let Some(root_cid) = repo_root {
516 info!(
517 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
518 did, root_cid
519 );
520 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
521 if let Ok(Some(block)) = state.block_store.get(&cid).await {
522 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
523 } else {
524 None
525 }
526 } else {
527 None
528 };
529 if let Err(e) = crate::api::repo::record::sequence_sync_event(
530 &state,
531 &did,
532 &root_cid,
533 rev.as_deref(),
534 )
535 .await
536 {
537 warn!(
538 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
539 e
540 );
541 } else {
542 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
543 }
544 } else {
545 warn!(
546 "[MIGRATION] activateAccount: No repo root found for did={}",
547 did
548 );
549 }
550 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
551 (StatusCode::OK, Json(json!({}))).into_response()
552 }
553 Err(e) => {
554 error!(
555 "[MIGRATION] activateAccount: DB error activating account: {:?}",
556 e
557 );
558 (
559 StatusCode::INTERNAL_SERVER_ERROR,
560 Json(json!({"error": "InternalError"})),
561 )
562 .into_response()
563 }
564 }
565}
566
567#[derive(Deserialize)]
568#[serde(rename_all = "camelCase")]
569pub struct DeactivateAccountInput {
570 pub delete_after: Option<String>,
571 pub migrating_to: Option<String>,
572}
573
574pub async fn deactivate_account(
575 State(state): State<AppState>,
576 headers: axum::http::HeaderMap,
577 Json(input): Json<DeactivateAccountInput>,
578) -> Response {
579 let extracted = match crate::auth::extract_auth_token_from_header(
580 headers.get("Authorization").and_then(|h| h.to_str().ok()),
581 ) {
582 Some(t) => t,
583 None => return ApiError::AuthenticationRequired.into_response(),
584 };
585 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
586 let http_uri = format!(
587 "https://{}/xrpc/com.atproto.server.deactivateAccount",
588 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
589 );
590 let auth_user = match crate::auth::validate_token_with_dpop(
591 &state.db,
592 &extracted.token,
593 extracted.is_dpop,
594 dpop_proof,
595 "POST",
596 &http_uri,
597 false,
598 )
599 .await
600 {
601 Ok(user) => user,
602 Err(e) => return ApiError::from(e).into_response(),
603 };
604
605 if let Err(e) = crate::auth::scope_check::check_account_scope(
606 auth_user.is_oauth,
607 auth_user.scope.as_deref(),
608 crate::oauth::scopes::AccountAttr::Repo,
609 crate::oauth::scopes::AccountAction::Manage,
610 ) {
611 return e;
612 }
613
614 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
615 .delete_after
616 .as_ref()
617 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
618 .map(|dt| dt.with_timezone(&chrono::Utc));
619
620 let did = auth_user.did;
621
622 let migrating_to = if let Some(ref url) = input.migrating_to {
623 let url = url.trim().trim_end_matches('/');
624 if url.is_empty() || !did.starts_with("did:web:") {
625 None
626 } else {
627 if !url.starts_with("https://") {
628 return ApiError::InvalidRequest("migratingTo must start with https://".into())
629 .into_response();
630 }
631 Some(url.to_string())
632 }
633 } else {
634 None
635 };
636
637 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
638 .fetch_optional(&state.db)
639 .await
640 .ok()
641 .flatten();
642
643 let result = if let Some(ref pds_url) = migrating_to {
644 sqlx::query!(
645 "UPDATE users SET deactivated_at = NOW(), delete_after = $2, migrated_to_pds = $3, migrated_at = NOW() WHERE did = $1",
646 did,
647 delete_after,
648 pds_url
649 )
650 .execute(&state.db)
651 .await
652 } else {
653 sqlx::query!(
654 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
655 did,
656 delete_after
657 )
658 .execute(&state.db)
659 .await
660 };
661
662 let status = if migrating_to.is_some() {
663 "migrated"
664 } else {
665 "deactivated"
666 };
667
668 match result {
669 Ok(_) => {
670 if let Some(ref h) = handle {
671 let _ = state.cache.delete(&format!("handle:{}", h)).await;
672 }
673 if let Err(e) =
674 crate::api::repo::record::sequence_account_event(&state, &did, false, Some(status))
675 .await
676 {
677 warn!("Failed to sequence account {} event: {}", status, e);
678 }
679 (StatusCode::OK, Json(json!({}))).into_response()
680 }
681 Err(e) => {
682 error!("DB error deactivating account: {:?}", e);
683 (
684 StatusCode::INTERNAL_SERVER_ERROR,
685 Json(json!({"error": "InternalError"})),
686 )
687 .into_response()
688 }
689 }
690}
691
692pub async fn request_account_delete(
693 State(state): State<AppState>,
694 headers: axum::http::HeaderMap,
695) -> Response {
696 let extracted = match crate::auth::extract_auth_token_from_header(
697 headers.get("Authorization").and_then(|h| h.to_str().ok()),
698 ) {
699 Some(t) => t,
700 None => return ApiError::AuthenticationRequired.into_response(),
701 };
702 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
703 let http_uri = format!(
704 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
705 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
706 );
707 let validated = match crate::auth::validate_token_with_dpop(
708 &state.db,
709 &extracted.token,
710 extracted.is_dpop,
711 dpop_proof,
712 "POST",
713 &http_uri,
714 true,
715 )
716 .await
717 {
718 Ok(user) => user,
719 Err(e) => return ApiError::from(e).into_response(),
720 };
721 let did = validated.did.clone();
722
723 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await {
724 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await;
725 }
726
727 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
728 .fetch_optional(&state.db)
729 .await
730 {
731 Ok(Some(id)) => id,
732 _ => {
733 return (
734 StatusCode::INTERNAL_SERVER_ERROR,
735 Json(json!({"error": "InternalError"})),
736 )
737 .into_response();
738 }
739 };
740 let confirmation_token = Uuid::new_v4().to_string();
741 let expires_at = Utc::now() + Duration::minutes(15);
742 let insert = sqlx::query!(
743 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
744 confirmation_token,
745 did,
746 expires_at
747 )
748 .execute(&state.db)
749 .await;
750 if let Err(e) = insert {
751 error!("DB error creating deletion token: {:?}", e);
752 return (
753 StatusCode::INTERNAL_SERVER_ERROR,
754 Json(json!({"error": "InternalError"})),
755 )
756 .into_response();
757 }
758 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
759 if let Err(e) =
760 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
761 .await
762 {
763 warn!("Failed to enqueue account deletion notification: {:?}", e);
764 }
765 info!("Account deletion requested for user {}", did);
766 (StatusCode::OK, Json(json!({}))).into_response()
767}
768
769#[derive(Deserialize)]
770pub struct DeleteAccountInput {
771 pub did: String,
772 pub password: String,
773 pub token: String,
774}
775
776pub async fn delete_account(
777 State(state): State<AppState>,
778 Json(input): Json<DeleteAccountInput>,
779) -> Response {
780 let did = input.did.trim();
781 let password = &input.password;
782 let token = input.token.trim();
783 if did.is_empty() {
784 return (
785 StatusCode::BAD_REQUEST,
786 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
787 )
788 .into_response();
789 }
790 if password.is_empty() {
791 return (
792 StatusCode::BAD_REQUEST,
793 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
794 )
795 .into_response();
796 }
797 const OLD_PASSWORD_MAX_LENGTH: usize = 512;
798 if password.len() > OLD_PASSWORD_MAX_LENGTH {
799 return (
800 StatusCode::BAD_REQUEST,
801 Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})),
802 )
803 .into_response();
804 }
805 if token.is_empty() {
806 return (
807 StatusCode::BAD_REQUEST,
808 Json(json!({"error": "InvalidToken", "message": "token is required"})),
809 )
810 .into_response();
811 }
812 let user = sqlx::query!(
813 "SELECT id, password_hash, handle FROM users WHERE did = $1",
814 did
815 )
816 .fetch_optional(&state.db)
817 .await;
818 let (user_id, password_hash, handle) = match user {
819 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
820 Ok(None) => {
821 return (
822 StatusCode::BAD_REQUEST,
823 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
824 )
825 .into_response();
826 }
827 Err(e) => {
828 error!("DB error in delete_account: {:?}", e);
829 return (
830 StatusCode::INTERNAL_SERVER_ERROR,
831 Json(json!({"error": "InternalError"})),
832 )
833 .into_response();
834 }
835 };
836 let password_valid = if password_hash
837 .as_ref()
838 .map(|h| verify(password, h).unwrap_or(false))
839 .unwrap_or(false)
840 {
841 true
842 } else {
843 let app_pass_rows = sqlx::query!(
844 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
845 user_id
846 )
847 .fetch_all(&state.db)
848 .await
849 .unwrap_or_default();
850 app_pass_rows
851 .iter()
852 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
853 };
854 if !password_valid {
855 return (
856 StatusCode::UNAUTHORIZED,
857 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
858 )
859 .into_response();
860 }
861 let deletion_request = sqlx::query!(
862 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
863 token
864 )
865 .fetch_optional(&state.db)
866 .await;
867 let (token_did, expires_at) = match deletion_request {
868 Ok(Some(row)) => (row.did, row.expires_at),
869 Ok(None) => {
870 return (
871 StatusCode::BAD_REQUEST,
872 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
873 )
874 .into_response();
875 }
876 Err(e) => {
877 error!("DB error fetching deletion token: {:?}", e);
878 return (
879 StatusCode::INTERNAL_SERVER_ERROR,
880 Json(json!({"error": "InternalError"})),
881 )
882 .into_response();
883 }
884 };
885 if token_did != did {
886 return (
887 StatusCode::BAD_REQUEST,
888 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
889 )
890 .into_response();
891 }
892 if Utc::now() > expires_at {
893 let _ = sqlx::query!(
894 "DELETE FROM account_deletion_requests WHERE token = $1",
895 token
896 )
897 .execute(&state.db)
898 .await;
899 return (
900 StatusCode::BAD_REQUEST,
901 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
902 )
903 .into_response();
904 }
905 let mut tx = match state.db.begin().await {
906 Ok(tx) => tx,
907 Err(e) => {
908 error!("Failed to begin transaction: {:?}", e);
909 return (
910 StatusCode::INTERNAL_SERVER_ERROR,
911 Json(json!({"error": "InternalError"})),
912 )
913 .into_response();
914 }
915 };
916 let deletion_result: Result<(), sqlx::Error> = async {
917 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
918 .execute(&mut *tx)
919 .await?;
920 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
921 .execute(&mut *tx)
922 .await?;
923 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
924 .execute(&mut *tx)
925 .await?;
926 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
927 .execute(&mut *tx)
928 .await?;
929 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
930 .execute(&mut *tx)
931 .await?;
932 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
933 .execute(&mut *tx)
934 .await?;
935 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
936 .execute(&mut *tx)
937 .await?;
938 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
939 .execute(&mut *tx)
940 .await?;
941 Ok(())
942 }
943 .await;
944 match deletion_result {
945 Ok(()) => {
946 if let Err(e) = tx.commit().await {
947 error!("Failed to commit account deletion transaction: {:?}", e);
948 return (
949 StatusCode::INTERNAL_SERVER_ERROR,
950 Json(json!({"error": "InternalError"})),
951 )
952 .into_response();
953 }
954 let account_seq = crate::api::repo::record::sequence_account_event(
955 &state,
956 did,
957 false,
958 Some("deleted"),
959 )
960 .await;
961 match account_seq {
962 Ok(seq) => {
963 if let Err(e) = sqlx::query!(
964 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
965 did,
966 seq
967 )
968 .execute(&state.db)
969 .await
970 {
971 warn!(
972 "Failed to cleanup sequences for deleted account {}: {}",
973 did, e
974 );
975 }
976 }
977 Err(e) => {
978 warn!(
979 "Failed to sequence account deletion event for {}: {}",
980 did, e
981 );
982 }
983 }
984 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
985 info!("Account {} deleted successfully", did);
986 (StatusCode::OK, Json(json!({}))).into_response()
987 }
988 Err(e) => {
989 error!("DB error deleting account, rolling back: {:?}", e);
990 (
991 StatusCode::INTERNAL_SERVER_ERROR,
992 Json(json!({"error": "InternalError"})),
993 )
994 .into_response()
995 }
996 }
997}