this repo has no description
1use crate::api::ApiError;
2use crate::plc::PlcClient;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::{Duration, Utc};
12use cid::Cid;
13use jacquard_repo::commit::Commit;
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::str::FromStr;
19use tracing::{error, info, warn};
20use uuid::Uuid;
21
22#[derive(Serialize)]
23#[serde(rename_all = "camelCase")]
24pub struct CheckAccountStatusOutput {
25 pub activated: bool,
26 pub valid_did: bool,
27 pub repo_commit: String,
28 pub repo_rev: String,
29 pub repo_blocks: i64,
30 pub indexed_records: i64,
31 pub private_state_values: i64,
32 pub expected_blobs: i64,
33 pub imported_blobs: i64,
34}
35
36pub async fn check_account_status(
37 State(state): State<AppState>,
38 headers: axum::http::HeaderMap,
39) -> Response {
40 let extracted = match crate::auth::extract_auth_token_from_header(
41 headers.get("Authorization").and_then(|h| h.to_str().ok()),
42 ) {
43 Some(t) => t,
44 None => return ApiError::AuthenticationRequired.into_response(),
45 };
46 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
47 let http_uri = format!(
48 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
49 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
50 );
51 let did = match crate::auth::validate_token_with_dpop(
52 &state.db,
53 &extracted.token,
54 extracted.is_dpop,
55 dpop_proof,
56 "GET",
57 &http_uri,
58 true,
59 )
60 .await
61 {
62 Ok(user) => user.did,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
66 .fetch_optional(&state.db)
67 .await
68 {
69 Ok(Some(id)) => id,
70 _ => {
71 return (
72 StatusCode::INTERNAL_SERVER_ERROR,
73 Json(json!({"error": "InternalError"})),
74 )
75 .into_response();
76 }
77 };
78 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
79 .fetch_optional(&state.db)
80 .await;
81 let deactivated_at = match user_status {
82 Ok(Some(row)) => row.deactivated_at,
83 _ => None,
84 };
85 let repo_result = sqlx::query!(
86 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
87 user_id
88 )
89 .fetch_optional(&state.db)
90 .await;
91 let (repo_commit, repo_rev_from_db) = match repo_result {
92 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
93 _ => (String::new(), None),
94 };
95 let block_count: i64 = sqlx::query_scalar!(
96 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1",
97 user_id
98 )
99 .fetch_one(&state.db)
100 .await
101 .unwrap_or(Some(0))
102 .unwrap_or(0);
103 let repo_rev = if let Some(rev) = repo_rev_from_db {
104 rev
105 } else if !repo_commit.is_empty() {
106 if let Ok(cid) = Cid::from_str(&repo_commit) {
107 if let Ok(Some(block)) = state.block_store.get(&cid).await {
108 Commit::from_cbor(&block)
109 .ok()
110 .map(|c| c.rev().to_string())
111 .unwrap_or_default()
112 } else {
113 String::new()
114 }
115 } else {
116 String::new()
117 }
118 } else {
119 String::new()
120 };
121 let record_count: i64 =
122 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
123 .fetch_one(&state.db)
124 .await
125 .unwrap_or(Some(0))
126 .unwrap_or(0);
127 let imported_blobs: i64 = sqlx::query_scalar!(
128 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
129 user_id
130 )
131 .fetch_one(&state.db)
132 .await
133 .unwrap_or(Some(0))
134 .unwrap_or(0);
135 let expected_blobs: i64 = sqlx::query_scalar!(
136 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
137 user_id
138 )
139 .fetch_one(&state.db)
140 .await
141 .unwrap_or(Some(0))
142 .unwrap_or(0);
143 let valid_did = is_valid_did_for_service(&state.db, &did).await;
144 (
145 StatusCode::OK,
146 Json(CheckAccountStatusOutput {
147 activated: deactivated_at.is_none(),
148 valid_did,
149 repo_commit: repo_commit.clone(),
150 repo_rev,
151 repo_blocks: block_count as i64,
152 indexed_records: record_count,
153 private_state_values: 0,
154 expected_blobs,
155 imported_blobs,
156 }),
157 )
158 .into_response()
159}
160
161async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool {
162 assert_valid_did_document_for_service(db, did, false)
163 .await
164 .is_ok()
165}
166
167async fn assert_valid_did_document_for_service(
168 db: &sqlx::PgPool,
169 did: &str,
170 with_retry: bool,
171) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
172 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
173 let expected_endpoint = format!("https://{}", hostname);
174
175 if did.starts_with("did:plc:") {
176 let plc_client = PlcClient::new(None);
177
178 let max_attempts = if with_retry { 5 } else { 1 };
179 let mut last_error = None;
180 let mut doc_data = None;
181 for attempt in 0..max_attempts {
182 if attempt > 0 {
183 let delay_ms = 500 * (1 << (attempt - 1));
184 info!(
185 "Waiting {}ms before retry {} for DID document validation ({})",
186 delay_ms, attempt, did
187 );
188 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
189 }
190
191 match plc_client.get_document_data(did).await {
192 Ok(data) => {
193 let pds_endpoint = data
194 .get("services")
195 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
196 .and_then(|p| p.get("endpoint"))
197 .and_then(|e| e.as_str());
198
199 if pds_endpoint == Some(&expected_endpoint) {
200 doc_data = Some(data);
201 break;
202 } else {
203 info!(
204 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying",
205 attempt + 1,
206 did,
207 pds_endpoint,
208 expected_endpoint
209 );
210 last_error = Some(format!(
211 "DID document endpoint {:?} does not match expected {}",
212 pds_endpoint, expected_endpoint
213 ));
214 }
215 }
216 Err(e) => {
217 warn!(
218 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
219 attempt + 1,
220 did,
221 e
222 );
223 last_error = Some(format!("Could not resolve DID document: {}", e));
224 }
225 }
226 }
227
228 let doc_data = match doc_data {
229 Some(d) => d,
230 None => {
231 return Err((
232 StatusCode::BAD_REQUEST,
233 Json(json!({
234 "error": "InvalidRequest",
235 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string())
236 })),
237 ));
238 }
239 };
240
241 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
242 if let Some(ref expected_rotation_key) = server_rotation_key {
243 let rotation_keys = doc_data
244 .get("rotationKeys")
245 .and_then(|v| v.as_array())
246 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>())
247 .unwrap_or_default();
248 if !rotation_keys.contains(&expected_rotation_key.as_str()) {
249 return Err((
250 StatusCode::BAD_REQUEST,
251 Json(json!({
252 "error": "InvalidRequest",
253 "message": "Server rotation key not included in PLC DID data"
254 })),
255 ));
256 }
257 }
258
259 let doc_signing_key = doc_data
260 .get("verificationMethods")
261 .and_then(|v| v.get("atproto"))
262 .and_then(|k| k.as_str());
263
264 let user_row = sqlx::query!(
265 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
266 did
267 )
268 .fetch_optional(db)
269 .await
270 .map_err(|e| {
271 error!("Failed to fetch user key: {:?}", e);
272 (
273 StatusCode::INTERNAL_SERVER_ERROR,
274 Json(json!({"error": "InternalError"})),
275 )
276 })?;
277
278 if let Some(row) = user_row {
279 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
280 .map_err(|e| {
281 error!("Failed to decrypt user key: {}", e);
282 (
283 StatusCode::INTERNAL_SERVER_ERROR,
284 Json(json!({"error": "InternalError"})),
285 )
286 })?;
287 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
288 error!("Failed to create signing key: {:?}", e);
289 (
290 StatusCode::INTERNAL_SERVER_ERROR,
291 Json(json!({"error": "InternalError"})),
292 )
293 })?;
294 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
295
296 if doc_signing_key != Some(&expected_did_key) {
297 warn!(
298 "DID {} has signing key {:?}, expected {}",
299 did, doc_signing_key, expected_did_key
300 );
301 return Err((
302 StatusCode::BAD_REQUEST,
303 Json(json!({
304 "error": "InvalidRequest",
305 "message": "DID document verification method does not match expected signing key"
306 })),
307 ));
308 }
309 }
310 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
311 let client = reqwest::Client::new();
312 let decoded = host_and_path.replace("%3A", ":");
313 let parts: Vec<&str> = decoded.split(':').collect();
314 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
315 {
316 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
317 } else {
318 (parts[0].to_string(), parts[1..].to_vec())
319 };
320 let scheme =
321 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
322 "http"
323 } else {
324 "https"
325 };
326 let url = if path_parts.is_empty() {
327 format!("{}://{}/.well-known/did.json", scheme, host)
328 } else {
329 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
330 };
331 let resp = client.get(&url).send().await.map_err(|e| {
332 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
333 (
334 StatusCode::BAD_REQUEST,
335 Json(json!({
336 "error": "InvalidRequest",
337 "message": format!("Could not resolve DID document: {}", e)
338 })),
339 )
340 })?;
341 let doc: serde_json::Value = resp.json().await.map_err(|e| {
342 warn!("Failed to parse did:web document for {}: {:?}", did, e);
343 (
344 StatusCode::BAD_REQUEST,
345 Json(json!({
346 "error": "InvalidRequest",
347 "message": format!("Could not parse DID document: {}", e)
348 })),
349 )
350 })?;
351
352 let pds_endpoint = doc
353 .get("service")
354 .and_then(|s| s.as_array())
355 .and_then(|arr| {
356 arr.iter().find(|svc| {
357 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
358 || svc.get("type").and_then(|t| t.as_str())
359 == Some("AtprotoPersonalDataServer")
360 })
361 })
362 .and_then(|svc| svc.get("serviceEndpoint"))
363 .and_then(|e| e.as_str());
364
365 if pds_endpoint != Some(&expected_endpoint) {
366 warn!(
367 "DID {} has endpoint {:?}, expected {}",
368 did, pds_endpoint, expected_endpoint
369 );
370 return Err((
371 StatusCode::BAD_REQUEST,
372 Json(json!({
373 "error": "InvalidRequest",
374 "message": "DID document atproto_pds service endpoint does not match PDS public url"
375 })),
376 ));
377 }
378 }
379
380 Ok(())
381}
382
383pub async fn activate_account(
384 State(state): State<AppState>,
385 headers: axum::http::HeaderMap,
386) -> Response {
387 info!("[MIGRATION] activateAccount called");
388 let extracted = match crate::auth::extract_auth_token_from_header(
389 headers.get("Authorization").and_then(|h| h.to_str().ok()),
390 ) {
391 Some(t) => t,
392 None => {
393 info!("[MIGRATION] activateAccount: No auth token");
394 return ApiError::AuthenticationRequired.into_response();
395 }
396 };
397 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
398 let http_uri = format!(
399 "https://{}/xrpc/com.atproto.server.activateAccount",
400 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
401 );
402 let auth_user = match crate::auth::validate_token_with_dpop(
403 &state.db,
404 &extracted.token,
405 extracted.is_dpop,
406 dpop_proof,
407 "POST",
408 &http_uri,
409 true,
410 )
411 .await
412 {
413 Ok(user) => user,
414 Err(e) => {
415 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
416 return ApiError::from(e).into_response();
417 }
418 };
419 info!(
420 "[MIGRATION] activateAccount: Authenticated user did={}",
421 auth_user.did
422 );
423
424 if let Err(e) = crate::auth::scope_check::check_account_scope(
425 auth_user.is_oauth,
426 auth_user.scope.as_deref(),
427 crate::oauth::scopes::AccountAttr::Repo,
428 crate::oauth::scopes::AccountAction::Manage,
429 ) {
430 info!("[MIGRATION] activateAccount: Scope check failed");
431 return e;
432 }
433
434 let did = auth_user.did;
435
436 info!(
437 "[MIGRATION] activateAccount: Validating DID document for did={}",
438 did
439 );
440 let did_validation_start = std::time::Instant::now();
441 if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await
442 {
443 info!(
444 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
445 did,
446 did_validation_start.elapsed()
447 );
448 return (status, json).into_response();
449 }
450 info!(
451 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
452 did,
453 did_validation_start.elapsed()
454 );
455
456 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
457 .fetch_optional(&state.db)
458 .await
459 .ok()
460 .flatten();
461 info!(
462 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
463 did, handle
464 );
465 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
466 .execute(&state.db)
467 .await;
468 match result {
469 Ok(_) => {
470 info!(
471 "[MIGRATION] activateAccount: DB update success for did={}",
472 did
473 );
474 if let Some(ref h) = handle {
475 let _ = state.cache.delete(&format!("handle:{}", h)).await;
476 }
477 info!(
478 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
479 did
480 );
481 if let Err(e) =
482 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
483 {
484 warn!(
485 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
486 e
487 );
488 } else {
489 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
490 }
491 info!(
492 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
493 did, handle
494 );
495 if let Err(e) =
496 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref())
497 .await
498 {
499 warn!(
500 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
501 e
502 );
503 } else {
504 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
505 }
506 let repo_root = sqlx::query_scalar!(
507 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
508 did
509 )
510 .fetch_optional(&state.db)
511 .await
512 .ok()
513 .flatten();
514 if let Some(root_cid) = repo_root {
515 info!(
516 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
517 did, root_cid
518 );
519 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
520 if let Ok(Some(block)) = state.block_store.get(&cid).await {
521 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
522 } else {
523 None
524 }
525 } else {
526 None
527 };
528 if let Err(e) = crate::api::repo::record::sequence_sync_event(
529 &state,
530 &did,
531 &root_cid,
532 rev.as_deref(),
533 )
534 .await
535 {
536 warn!(
537 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
538 e
539 );
540 } else {
541 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
542 }
543 } else {
544 warn!(
545 "[MIGRATION] activateAccount: No repo root found for did={}",
546 did
547 );
548 }
549 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
550 (StatusCode::OK, Json(json!({}))).into_response()
551 }
552 Err(e) => {
553 error!(
554 "[MIGRATION] activateAccount: DB error activating account: {:?}",
555 e
556 );
557 (
558 StatusCode::INTERNAL_SERVER_ERROR,
559 Json(json!({"error": "InternalError"})),
560 )
561 .into_response()
562 }
563 }
564}
565
566#[derive(Deserialize)]
567#[serde(rename_all = "camelCase")]
568pub struct DeactivateAccountInput {
569 pub delete_after: Option<String>,
570 pub migrating_to: Option<String>,
571}
572
573pub async fn deactivate_account(
574 State(state): State<AppState>,
575 headers: axum::http::HeaderMap,
576 Json(input): Json<DeactivateAccountInput>,
577) -> Response {
578 let extracted = match crate::auth::extract_auth_token_from_header(
579 headers.get("Authorization").and_then(|h| h.to_str().ok()),
580 ) {
581 Some(t) => t,
582 None => return ApiError::AuthenticationRequired.into_response(),
583 };
584 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
585 let http_uri = format!(
586 "https://{}/xrpc/com.atproto.server.deactivateAccount",
587 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
588 );
589 let auth_user = match crate::auth::validate_token_with_dpop(
590 &state.db,
591 &extracted.token,
592 extracted.is_dpop,
593 dpop_proof,
594 "POST",
595 &http_uri,
596 false,
597 )
598 .await
599 {
600 Ok(user) => user,
601 Err(e) => return ApiError::from(e).into_response(),
602 };
603
604 if let Err(e) = crate::auth::scope_check::check_account_scope(
605 auth_user.is_oauth,
606 auth_user.scope.as_deref(),
607 crate::oauth::scopes::AccountAttr::Repo,
608 crate::oauth::scopes::AccountAction::Manage,
609 ) {
610 return e;
611 }
612
613 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
614 .delete_after
615 .as_ref()
616 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
617 .map(|dt| dt.with_timezone(&chrono::Utc));
618
619 let did = auth_user.did;
620
621 let migrating_to = if let Some(ref url) = input.migrating_to {
622 let url = url.trim().trim_end_matches('/');
623 if url.is_empty() || !did.starts_with("did:web:") {
624 None
625 } else {
626 if !url.starts_with("https://") {
627 return ApiError::InvalidRequest("migratingTo must start with https://".into())
628 .into_response();
629 }
630 Some(url.to_string())
631 }
632 } else {
633 None
634 };
635
636 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
637 .fetch_optional(&state.db)
638 .await
639 .ok()
640 .flatten();
641
642 let result = if let Some(ref pds_url) = migrating_to {
643 sqlx::query!(
644 "UPDATE users SET deactivated_at = NOW(), delete_after = $2, migrated_to_pds = $3, migrated_at = NOW() WHERE did = $1",
645 did,
646 delete_after,
647 pds_url
648 )
649 .execute(&state.db)
650 .await
651 } else {
652 sqlx::query!(
653 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
654 did,
655 delete_after
656 )
657 .execute(&state.db)
658 .await
659 };
660
661 let status = if migrating_to.is_some() {
662 "migrated"
663 } else {
664 "deactivated"
665 };
666
667 match result {
668 Ok(_) => {
669 if let Some(ref h) = handle {
670 let _ = state.cache.delete(&format!("handle:{}", h)).await;
671 }
672 if let Err(e) =
673 crate::api::repo::record::sequence_account_event(&state, &did, false, Some(status))
674 .await
675 {
676 warn!("Failed to sequence account {} event: {}", status, e);
677 }
678 (StatusCode::OK, Json(json!({}))).into_response()
679 }
680 Err(e) => {
681 error!("DB error deactivating account: {:?}", e);
682 (
683 StatusCode::INTERNAL_SERVER_ERROR,
684 Json(json!({"error": "InternalError"})),
685 )
686 .into_response()
687 }
688 }
689}
690
691pub async fn request_account_delete(
692 State(state): State<AppState>,
693 headers: axum::http::HeaderMap,
694) -> Response {
695 let extracted = match crate::auth::extract_auth_token_from_header(
696 headers.get("Authorization").and_then(|h| h.to_str().ok()),
697 ) {
698 Some(t) => t,
699 None => return ApiError::AuthenticationRequired.into_response(),
700 };
701 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
702 let http_uri = format!(
703 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
704 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
705 );
706 let validated = match crate::auth::validate_token_with_dpop(
707 &state.db,
708 &extracted.token,
709 extracted.is_dpop,
710 dpop_proof,
711 "POST",
712 &http_uri,
713 true,
714 )
715 .await
716 {
717 Ok(user) => user,
718 Err(e) => return ApiError::from(e).into_response(),
719 };
720 let did = validated.did.clone();
721
722 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await {
723 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await;
724 }
725
726 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
727 .fetch_optional(&state.db)
728 .await
729 {
730 Ok(Some(id)) => id,
731 _ => {
732 return (
733 StatusCode::INTERNAL_SERVER_ERROR,
734 Json(json!({"error": "InternalError"})),
735 )
736 .into_response();
737 }
738 };
739 let confirmation_token = Uuid::new_v4().to_string();
740 let expires_at = Utc::now() + Duration::minutes(15);
741 let insert = sqlx::query!(
742 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
743 confirmation_token,
744 did,
745 expires_at
746 )
747 .execute(&state.db)
748 .await;
749 if let Err(e) = insert {
750 error!("DB error creating deletion token: {:?}", e);
751 return (
752 StatusCode::INTERNAL_SERVER_ERROR,
753 Json(json!({"error": "InternalError"})),
754 )
755 .into_response();
756 }
757 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
758 if let Err(e) =
759 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
760 .await
761 {
762 warn!("Failed to enqueue account deletion notification: {:?}", e);
763 }
764 info!("Account deletion requested for user {}", did);
765 (StatusCode::OK, Json(json!({}))).into_response()
766}
767
768#[derive(Deserialize)]
769pub struct DeleteAccountInput {
770 pub did: String,
771 pub password: String,
772 pub token: String,
773}
774
775pub async fn delete_account(
776 State(state): State<AppState>,
777 Json(input): Json<DeleteAccountInput>,
778) -> Response {
779 let did = input.did.trim();
780 let password = &input.password;
781 let token = input.token.trim();
782 if did.is_empty() {
783 return (
784 StatusCode::BAD_REQUEST,
785 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
786 )
787 .into_response();
788 }
789 if password.is_empty() {
790 return (
791 StatusCode::BAD_REQUEST,
792 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
793 )
794 .into_response();
795 }
796 const OLD_PASSWORD_MAX_LENGTH: usize = 512;
797 if password.len() > OLD_PASSWORD_MAX_LENGTH {
798 return (
799 StatusCode::BAD_REQUEST,
800 Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})),
801 )
802 .into_response();
803 }
804 if token.is_empty() {
805 return (
806 StatusCode::BAD_REQUEST,
807 Json(json!({"error": "InvalidToken", "message": "token is required"})),
808 )
809 .into_response();
810 }
811 let user = sqlx::query!(
812 "SELECT id, password_hash, handle FROM users WHERE did = $1",
813 did
814 )
815 .fetch_optional(&state.db)
816 .await;
817 let (user_id, password_hash, handle) = match user {
818 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
819 Ok(None) => {
820 return (
821 StatusCode::BAD_REQUEST,
822 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
823 )
824 .into_response();
825 }
826 Err(e) => {
827 error!("DB error in delete_account: {:?}", e);
828 return (
829 StatusCode::INTERNAL_SERVER_ERROR,
830 Json(json!({"error": "InternalError"})),
831 )
832 .into_response();
833 }
834 };
835 let password_valid = if password_hash
836 .as_ref()
837 .map(|h| verify(password, h).unwrap_or(false))
838 .unwrap_or(false)
839 {
840 true
841 } else {
842 let app_pass_rows = sqlx::query!(
843 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
844 user_id
845 )
846 .fetch_all(&state.db)
847 .await
848 .unwrap_or_default();
849 app_pass_rows
850 .iter()
851 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
852 };
853 if !password_valid {
854 return (
855 StatusCode::UNAUTHORIZED,
856 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
857 )
858 .into_response();
859 }
860 let deletion_request = sqlx::query!(
861 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
862 token
863 )
864 .fetch_optional(&state.db)
865 .await;
866 let (token_did, expires_at) = match deletion_request {
867 Ok(Some(row)) => (row.did, row.expires_at),
868 Ok(None) => {
869 return (
870 StatusCode::BAD_REQUEST,
871 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
872 )
873 .into_response();
874 }
875 Err(e) => {
876 error!("DB error fetching deletion token: {:?}", e);
877 return (
878 StatusCode::INTERNAL_SERVER_ERROR,
879 Json(json!({"error": "InternalError"})),
880 )
881 .into_response();
882 }
883 };
884 if token_did != did {
885 return (
886 StatusCode::BAD_REQUEST,
887 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
888 )
889 .into_response();
890 }
891 if Utc::now() > expires_at {
892 let _ = sqlx::query!(
893 "DELETE FROM account_deletion_requests WHERE token = $1",
894 token
895 )
896 .execute(&state.db)
897 .await;
898 return (
899 StatusCode::BAD_REQUEST,
900 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
901 )
902 .into_response();
903 }
904 let mut tx = match state.db.begin().await {
905 Ok(tx) => tx,
906 Err(e) => {
907 error!("Failed to begin transaction: {:?}", e);
908 return (
909 StatusCode::INTERNAL_SERVER_ERROR,
910 Json(json!({"error": "InternalError"})),
911 )
912 .into_response();
913 }
914 };
915 let deletion_result: Result<(), sqlx::Error> = async {
916 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
917 .execute(&mut *tx)
918 .await?;
919 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
920 .execute(&mut *tx)
921 .await?;
922 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
923 .execute(&mut *tx)
924 .await?;
925 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
926 .execute(&mut *tx)
927 .await?;
928 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
929 .execute(&mut *tx)
930 .await?;
931 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
932 .execute(&mut *tx)
933 .await?;
934 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
935 .execute(&mut *tx)
936 .await?;
937 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
938 .execute(&mut *tx)
939 .await?;
940 Ok(())
941 }
942 .await;
943 match deletion_result {
944 Ok(()) => {
945 if let Err(e) = tx.commit().await {
946 error!("Failed to commit account deletion transaction: {:?}", e);
947 return (
948 StatusCode::INTERNAL_SERVER_ERROR,
949 Json(json!({"error": "InternalError"})),
950 )
951 .into_response();
952 }
953 let account_seq = crate::api::repo::record::sequence_account_event(
954 &state,
955 did,
956 false,
957 Some("deleted"),
958 )
959 .await;
960 match account_seq {
961 Ok(seq) => {
962 if let Err(e) = sqlx::query!(
963 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
964 did,
965 seq
966 )
967 .execute(&state.db)
968 .await
969 {
970 warn!(
971 "Failed to cleanup sequences for deleted account {}: {}",
972 did, e
973 );
974 }
975 }
976 Err(e) => {
977 warn!(
978 "Failed to sequence account deletion event for {}: {}",
979 did, e
980 );
981 }
982 }
983 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
984 info!("Account {} deleted successfully", did);
985 (StatusCode::OK, Json(json!({}))).into_response()
986 }
987 Err(e) => {
988 error!("DB error deleting account, rolling back: {:?}", e);
989 (
990 StatusCode::INTERNAL_SERVER_ERROR,
991 Json(json!({"error": "InternalError"})),
992 )
993 .into_response()
994 }
995 }
996}