this repo has no description
1use crate::api::ApiError;
2use crate::cache::Cache;
3use crate::plc::PlcClient;
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use bcrypt::verify;
12use chrono::{Duration, Utc};
13use cid::Cid;
14use jacquard_repo::commit::Commit;
15use jacquard_repo::storage::BlockStore;
16use k256::ecdsa::SigningKey;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::{error, info, warn};
22use uuid::Uuid;
23
24#[derive(Serialize)]
25#[serde(rename_all = "camelCase")]
26pub struct CheckAccountStatusOutput {
27 pub activated: bool,
28 pub valid_did: bool,
29 pub repo_commit: String,
30 pub repo_rev: String,
31 pub repo_blocks: i64,
32 pub indexed_records: i64,
33 pub private_state_values: i64,
34 pub expected_blobs: i64,
35 pub imported_blobs: i64,
36}
37
38pub async fn check_account_status(
39 State(state): State<AppState>,
40 headers: axum::http::HeaderMap,
41) -> Response {
42 let extracted = match crate::auth::extract_auth_token_from_header(
43 headers.get("Authorization").and_then(|h| h.to_str().ok()),
44 ) {
45 Some(t) => t,
46 None => return ApiError::AuthenticationRequired.into_response(),
47 };
48 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
49 let http_uri = format!(
50 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
51 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
52 );
53 let did = match crate::auth::validate_token_with_dpop(
54 &state.db,
55 &extracted.token,
56 extracted.is_dpop,
57 dpop_proof,
58 "GET",
59 &http_uri,
60 true,
61 )
62 .await
63 {
64 Ok(user) => user.did,
65 Err(e) => return ApiError::from(e).into_response(),
66 };
67 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
68 .fetch_optional(&state.db)
69 .await
70 {
71 Ok(Some(id)) => id,
72 _ => {
73 return (
74 StatusCode::INTERNAL_SERVER_ERROR,
75 Json(json!({"error": "InternalError"})),
76 )
77 .into_response();
78 }
79 };
80 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
81 .fetch_optional(&state.db)
82 .await;
83 let deactivated_at = match user_status {
84 Ok(Some(row)) => row.deactivated_at,
85 _ => None,
86 };
87 let repo_result = sqlx::query!(
88 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
89 user_id
90 )
91 .fetch_optional(&state.db)
92 .await;
93 let (repo_commit, repo_rev_from_db) = match repo_result {
94 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
95 _ => (String::new(), None),
96 };
97 let block_count: i64 = sqlx::query_scalar!(
98 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1",
99 user_id
100 )
101 .fetch_one(&state.db)
102 .await
103 .unwrap_or(Some(0))
104 .unwrap_or(0);
105 let repo_rev = if let Some(rev) = repo_rev_from_db {
106 rev
107 } else if !repo_commit.is_empty() {
108 if let Ok(cid) = Cid::from_str(&repo_commit) {
109 if let Ok(Some(block)) = state.block_store.get(&cid).await {
110 Commit::from_cbor(&block)
111 .ok()
112 .map(|c| c.rev().to_string())
113 .unwrap_or_default()
114 } else {
115 String::new()
116 }
117 } else {
118 String::new()
119 }
120 } else {
121 String::new()
122 };
123 let record_count: i64 =
124 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
125 .fetch_one(&state.db)
126 .await
127 .unwrap_or(Some(0))
128 .unwrap_or(0);
129 let imported_blobs: i64 = sqlx::query_scalar!(
130 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
131 user_id
132 )
133 .fetch_one(&state.db)
134 .await
135 .unwrap_or(Some(0))
136 .unwrap_or(0);
137 let expected_blobs: i64 = sqlx::query_scalar!(
138 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
139 user_id
140 )
141 .fetch_one(&state.db)
142 .await
143 .unwrap_or(Some(0))
144 .unwrap_or(0);
145 let valid_did = is_valid_did_for_service(&state.db, &state.cache, &did).await;
146 (
147 StatusCode::OK,
148 Json(CheckAccountStatusOutput {
149 activated: deactivated_at.is_none(),
150 valid_did,
151 repo_commit: repo_commit.clone(),
152 repo_rev,
153 repo_blocks: block_count as i64,
154 indexed_records: record_count,
155 private_state_values: 0,
156 expected_blobs,
157 imported_blobs,
158 }),
159 )
160 .into_response()
161}
162
163async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: &Arc<dyn Cache>, did: &str) -> bool {
164 assert_valid_did_document_for_service(db, cache, did, false)
165 .await
166 .is_ok()
167}
168
169async fn assert_valid_did_document_for_service(
170 db: &sqlx::PgPool,
171 cache: &Arc<dyn Cache>,
172 did: &str,
173 with_retry: bool,
174) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
175 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
176 let expected_endpoint = format!("https://{}", hostname);
177
178 if did.starts_with("did:plc:") {
179 let plc_client = PlcClient::with_cache(None, Some(cache.clone()));
180
181 let max_attempts = if with_retry { 5 } else { 1 };
182 let mut last_error = None;
183 let mut doc_data = None;
184 for attempt in 0..max_attempts {
185 if attempt > 0 {
186 let delay_ms = 500 * (1 << (attempt - 1));
187 info!(
188 "Waiting {}ms before retry {} for DID document validation ({})",
189 delay_ms, attempt, did
190 );
191 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
192 }
193
194 match plc_client.get_document_data(did).await {
195 Ok(data) => {
196 let pds_endpoint = data
197 .get("services")
198 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
199 .and_then(|p| p.get("endpoint"))
200 .and_then(|e| e.as_str());
201
202 if pds_endpoint == Some(&expected_endpoint) {
203 doc_data = Some(data);
204 break;
205 } else {
206 info!(
207 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying",
208 attempt + 1,
209 did,
210 pds_endpoint,
211 expected_endpoint
212 );
213 last_error = Some(format!(
214 "DID document endpoint {:?} does not match expected {}",
215 pds_endpoint, expected_endpoint
216 ));
217 }
218 }
219 Err(e) => {
220 warn!(
221 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
222 attempt + 1,
223 did,
224 e
225 );
226 last_error = Some(format!("Could not resolve DID document: {}", e));
227 }
228 }
229 }
230
231 let doc_data = match doc_data {
232 Some(d) => d,
233 None => {
234 return Err((
235 StatusCode::BAD_REQUEST,
236 Json(json!({
237 "error": "InvalidRequest",
238 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string())
239 })),
240 ));
241 }
242 };
243
244 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
245 if let Some(ref expected_rotation_key) = server_rotation_key {
246 let rotation_keys = doc_data
247 .get("rotationKeys")
248 .and_then(|v| v.as_array())
249 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>())
250 .unwrap_or_default();
251 if !rotation_keys.contains(&expected_rotation_key.as_str()) {
252 return Err((
253 StatusCode::BAD_REQUEST,
254 Json(json!({
255 "error": "InvalidRequest",
256 "message": "Server rotation key not included in PLC DID data"
257 })),
258 ));
259 }
260 }
261
262 let doc_signing_key = doc_data
263 .get("verificationMethods")
264 .and_then(|v| v.get("atproto"))
265 .and_then(|k| k.as_str());
266
267 let user_row = sqlx::query!(
268 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
269 did
270 )
271 .fetch_optional(db)
272 .await
273 .map_err(|e| {
274 error!("Failed to fetch user key: {:?}", e);
275 (
276 StatusCode::INTERNAL_SERVER_ERROR,
277 Json(json!({"error": "InternalError"})),
278 )
279 })?;
280
281 if let Some(row) = user_row {
282 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
283 .map_err(|e| {
284 error!("Failed to decrypt user key: {}", e);
285 (
286 StatusCode::INTERNAL_SERVER_ERROR,
287 Json(json!({"error": "InternalError"})),
288 )
289 })?;
290 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
291 error!("Failed to create signing key: {:?}", e);
292 (
293 StatusCode::INTERNAL_SERVER_ERROR,
294 Json(json!({"error": "InternalError"})),
295 )
296 })?;
297 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
298
299 if doc_signing_key != Some(&expected_did_key) {
300 warn!(
301 "DID {} has signing key {:?}, expected {}",
302 did, doc_signing_key, expected_did_key
303 );
304 return Err((
305 StatusCode::BAD_REQUEST,
306 Json(json!({
307 "error": "InvalidRequest",
308 "message": "DID document verification method does not match expected signing key"
309 })),
310 ));
311 }
312 }
313 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
314 let client = crate::api::proxy_client::did_resolution_client();
315 let decoded = host_and_path.replace("%3A", ":");
316 let parts: Vec<&str> = decoded.split(':').collect();
317 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
318 {
319 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
320 } else {
321 (parts[0].to_string(), parts[1..].to_vec())
322 };
323 let scheme =
324 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
325 "http"
326 } else {
327 "https"
328 };
329 let url = if path_parts.is_empty() {
330 format!("{}://{}/.well-known/did.json", scheme, host)
331 } else {
332 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
333 };
334 let resp = client.get(&url).send().await.map_err(|e| {
335 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
336 (
337 StatusCode::BAD_REQUEST,
338 Json(json!({
339 "error": "InvalidRequest",
340 "message": format!("Could not resolve DID document: {}", e)
341 })),
342 )
343 })?;
344 let doc: serde_json::Value = resp.json().await.map_err(|e| {
345 warn!("Failed to parse did:web document for {}: {:?}", did, e);
346 (
347 StatusCode::BAD_REQUEST,
348 Json(json!({
349 "error": "InvalidRequest",
350 "message": format!("Could not parse DID document: {}", e)
351 })),
352 )
353 })?;
354
355 let pds_endpoint = doc
356 .get("service")
357 .and_then(|s| s.as_array())
358 .and_then(|arr| {
359 arr.iter().find(|svc| {
360 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
361 || svc.get("type").and_then(|t| t.as_str())
362 == Some("AtprotoPersonalDataServer")
363 })
364 })
365 .and_then(|svc| svc.get("serviceEndpoint"))
366 .and_then(|e| e.as_str());
367
368 if pds_endpoint != Some(&expected_endpoint) {
369 warn!(
370 "DID {} has endpoint {:?}, expected {}",
371 did, pds_endpoint, expected_endpoint
372 );
373 return Err((
374 StatusCode::BAD_REQUEST,
375 Json(json!({
376 "error": "InvalidRequest",
377 "message": "DID document atproto_pds service endpoint does not match PDS public url"
378 })),
379 ));
380 }
381 }
382
383 Ok(())
384}
385
386pub async fn activate_account(
387 State(state): State<AppState>,
388 headers: axum::http::HeaderMap,
389) -> Response {
390 info!("[MIGRATION] activateAccount called");
391 let extracted = match crate::auth::extract_auth_token_from_header(
392 headers.get("Authorization").and_then(|h| h.to_str().ok()),
393 ) {
394 Some(t) => t,
395 None => {
396 info!("[MIGRATION] activateAccount: No auth token");
397 return ApiError::AuthenticationRequired.into_response();
398 }
399 };
400 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
401 let http_uri = format!(
402 "https://{}/xrpc/com.atproto.server.activateAccount",
403 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
404 );
405 let auth_user = match crate::auth::validate_token_with_dpop(
406 &state.db,
407 &extracted.token,
408 extracted.is_dpop,
409 dpop_proof,
410 "POST",
411 &http_uri,
412 true,
413 )
414 .await
415 {
416 Ok(user) => user,
417 Err(e) => {
418 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
419 return ApiError::from(e).into_response();
420 }
421 };
422 info!(
423 "[MIGRATION] activateAccount: Authenticated user did={}",
424 auth_user.did
425 );
426
427 if let Err(e) = crate::auth::scope_check::check_account_scope(
428 auth_user.is_oauth,
429 auth_user.scope.as_deref(),
430 crate::oauth::scopes::AccountAttr::Repo,
431 crate::oauth::scopes::AccountAction::Manage,
432 ) {
433 info!("[MIGRATION] activateAccount: Scope check failed");
434 return e;
435 }
436
437 let did = auth_user.did;
438
439 info!(
440 "[MIGRATION] activateAccount: Validating DID document for did={}",
441 did
442 );
443 let did_validation_start = std::time::Instant::now();
444 if let Err((status, json)) =
445 assert_valid_did_document_for_service(&state.db, &state.cache, &did, true).await
446 {
447 info!(
448 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
449 did,
450 did_validation_start.elapsed()
451 );
452 return (status, json).into_response();
453 }
454 info!(
455 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
456 did,
457 did_validation_start.elapsed()
458 );
459
460 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
461 .fetch_optional(&state.db)
462 .await
463 .ok()
464 .flatten();
465 info!(
466 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
467 did, handle
468 );
469 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
470 .execute(&state.db)
471 .await;
472 match result {
473 Ok(_) => {
474 info!(
475 "[MIGRATION] activateAccount: DB update success for did={}",
476 did
477 );
478 if let Some(ref h) = handle {
479 let _ = state.cache.delete(&format!("handle:{}", h)).await;
480 }
481 info!(
482 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
483 did
484 );
485 if let Err(e) =
486 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
487 {
488 warn!(
489 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
490 e
491 );
492 } else {
493 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
494 }
495 info!(
496 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
497 did, handle
498 );
499 if let Err(e) =
500 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref())
501 .await
502 {
503 warn!(
504 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
505 e
506 );
507 } else {
508 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
509 }
510 let repo_root = sqlx::query_scalar!(
511 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
512 did
513 )
514 .fetch_optional(&state.db)
515 .await
516 .ok()
517 .flatten();
518 if let Some(root_cid) = repo_root {
519 info!(
520 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
521 did, root_cid
522 );
523 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
524 if let Ok(Some(block)) = state.block_store.get(&cid).await {
525 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
526 } else {
527 None
528 }
529 } else {
530 None
531 };
532 if let Err(e) = crate::api::repo::record::sequence_sync_event(
533 &state,
534 &did,
535 &root_cid,
536 rev.as_deref(),
537 )
538 .await
539 {
540 warn!(
541 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
542 e
543 );
544 } else {
545 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
546 }
547 } else {
548 warn!(
549 "[MIGRATION] activateAccount: No repo root found for did={}",
550 did
551 );
552 }
553 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
554 (StatusCode::OK, Json(json!({}))).into_response()
555 }
556 Err(e) => {
557 error!(
558 "[MIGRATION] activateAccount: DB error activating account: {:?}",
559 e
560 );
561 (
562 StatusCode::INTERNAL_SERVER_ERROR,
563 Json(json!({"error": "InternalError"})),
564 )
565 .into_response()
566 }
567 }
568}
569
570#[derive(Deserialize)]
571#[serde(rename_all = "camelCase")]
572pub struct DeactivateAccountInput {
573 pub delete_after: Option<String>,
574}
575
576pub async fn deactivate_account(
577 State(state): State<AppState>,
578 headers: axum::http::HeaderMap,
579 Json(input): Json<DeactivateAccountInput>,
580) -> Response {
581 let extracted = match crate::auth::extract_auth_token_from_header(
582 headers.get("Authorization").and_then(|h| h.to_str().ok()),
583 ) {
584 Some(t) => t,
585 None => return ApiError::AuthenticationRequired.into_response(),
586 };
587 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
588 let http_uri = format!(
589 "https://{}/xrpc/com.atproto.server.deactivateAccount",
590 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
591 );
592 let auth_user = match crate::auth::validate_token_with_dpop(
593 &state.db,
594 &extracted.token,
595 extracted.is_dpop,
596 dpop_proof,
597 "POST",
598 &http_uri,
599 false,
600 )
601 .await
602 {
603 Ok(user) => user,
604 Err(e) => return ApiError::from(e).into_response(),
605 };
606
607 if let Err(e) = crate::auth::scope_check::check_account_scope(
608 auth_user.is_oauth,
609 auth_user.scope.as_deref(),
610 crate::oauth::scopes::AccountAttr::Repo,
611 crate::oauth::scopes::AccountAction::Manage,
612 ) {
613 return e;
614 }
615
616 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
617 .delete_after
618 .as_ref()
619 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
620 .map(|dt| dt.with_timezone(&chrono::Utc));
621
622 let did = auth_user.did;
623
624 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
625 .fetch_optional(&state.db)
626 .await
627 .ok()
628 .flatten();
629
630 let result = sqlx::query!(
631 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
632 did,
633 delete_after
634 )
635 .execute(&state.db)
636 .await;
637
638 match result {
639 Ok(_) => {
640 if let Some(ref h) = handle {
641 let _ = state.cache.delete(&format!("handle:{}", h)).await;
642 }
643 if let Err(e) = crate::api::repo::record::sequence_account_event(
644 &state,
645 &did,
646 false,
647 Some("deactivated"),
648 )
649 .await
650 {
651 warn!("Failed to sequence account deactivated event: {}", e);
652 }
653 (StatusCode::OK, Json(json!({}))).into_response()
654 }
655 Err(e) => {
656 error!("DB error deactivating account: {:?}", e);
657 (
658 StatusCode::INTERNAL_SERVER_ERROR,
659 Json(json!({"error": "InternalError"})),
660 )
661 .into_response()
662 }
663 }
664}
665
666pub async fn request_account_delete(
667 State(state): State<AppState>,
668 headers: axum::http::HeaderMap,
669) -> Response {
670 let extracted = match crate::auth::extract_auth_token_from_header(
671 headers.get("Authorization").and_then(|h| h.to_str().ok()),
672 ) {
673 Some(t) => t,
674 None => return ApiError::AuthenticationRequired.into_response(),
675 };
676 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
677 let http_uri = format!(
678 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
679 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
680 );
681 let validated = match crate::auth::validate_token_with_dpop(
682 &state.db,
683 &extracted.token,
684 extracted.is_dpop,
685 dpop_proof,
686 "POST",
687 &http_uri,
688 true,
689 )
690 .await
691 {
692 Ok(user) => user,
693 Err(e) => return ApiError::from(e).into_response(),
694 };
695 let did = validated.did.clone();
696
697 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await {
698 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await;
699 }
700
701 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
702 .fetch_optional(&state.db)
703 .await
704 {
705 Ok(Some(id)) => id,
706 _ => {
707 return (
708 StatusCode::INTERNAL_SERVER_ERROR,
709 Json(json!({"error": "InternalError"})),
710 )
711 .into_response();
712 }
713 };
714 let confirmation_token = Uuid::new_v4().to_string();
715 let expires_at = Utc::now() + Duration::minutes(15);
716 let insert = sqlx::query!(
717 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
718 confirmation_token,
719 did,
720 expires_at
721 )
722 .execute(&state.db)
723 .await;
724 if let Err(e) = insert {
725 error!("DB error creating deletion token: {:?}", e);
726 return (
727 StatusCode::INTERNAL_SERVER_ERROR,
728 Json(json!({"error": "InternalError"})),
729 )
730 .into_response();
731 }
732 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
733 if let Err(e) =
734 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
735 .await
736 {
737 warn!("Failed to enqueue account deletion notification: {:?}", e);
738 }
739 info!("Account deletion requested for user {}", did);
740 (StatusCode::OK, Json(json!({}))).into_response()
741}
742
743#[derive(Deserialize)]
744pub struct DeleteAccountInput {
745 pub did: String,
746 pub password: String,
747 pub token: String,
748}
749
750pub async fn delete_account(
751 State(state): State<AppState>,
752 Json(input): Json<DeleteAccountInput>,
753) -> Response {
754 let did = input.did.trim();
755 let password = &input.password;
756 let token = input.token.trim();
757 if did.is_empty() {
758 return (
759 StatusCode::BAD_REQUEST,
760 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
761 )
762 .into_response();
763 }
764 if password.is_empty() {
765 return (
766 StatusCode::BAD_REQUEST,
767 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
768 )
769 .into_response();
770 }
771 const OLD_PASSWORD_MAX_LENGTH: usize = 512;
772 if password.len() > OLD_PASSWORD_MAX_LENGTH {
773 return (
774 StatusCode::BAD_REQUEST,
775 Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})),
776 )
777 .into_response();
778 }
779 if token.is_empty() {
780 return (
781 StatusCode::BAD_REQUEST,
782 Json(json!({"error": "InvalidToken", "message": "token is required"})),
783 )
784 .into_response();
785 }
786 let user = sqlx::query!(
787 "SELECT id, password_hash, handle FROM users WHERE did = $1",
788 did
789 )
790 .fetch_optional(&state.db)
791 .await;
792 let (user_id, password_hash, handle) = match user {
793 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
794 Ok(None) => {
795 return (
796 StatusCode::BAD_REQUEST,
797 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
798 )
799 .into_response();
800 }
801 Err(e) => {
802 error!("DB error in delete_account: {:?}", e);
803 return (
804 StatusCode::INTERNAL_SERVER_ERROR,
805 Json(json!({"error": "InternalError"})),
806 )
807 .into_response();
808 }
809 };
810 let password_valid = if password_hash
811 .as_ref()
812 .map(|h| verify(password, h).unwrap_or(false))
813 .unwrap_or(false)
814 {
815 true
816 } else {
817 let app_pass_rows = sqlx::query!(
818 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
819 user_id
820 )
821 .fetch_all(&state.db)
822 .await
823 .unwrap_or_default();
824 app_pass_rows
825 .iter()
826 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
827 };
828 if !password_valid {
829 return (
830 StatusCode::UNAUTHORIZED,
831 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
832 )
833 .into_response();
834 }
835 let deletion_request = sqlx::query!(
836 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
837 token
838 )
839 .fetch_optional(&state.db)
840 .await;
841 let (token_did, expires_at) = match deletion_request {
842 Ok(Some(row)) => (row.did, row.expires_at),
843 Ok(None) => {
844 return (
845 StatusCode::BAD_REQUEST,
846 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
847 )
848 .into_response();
849 }
850 Err(e) => {
851 error!("DB error fetching deletion token: {:?}", e);
852 return (
853 StatusCode::INTERNAL_SERVER_ERROR,
854 Json(json!({"error": "InternalError"})),
855 )
856 .into_response();
857 }
858 };
859 if token_did != did {
860 return (
861 StatusCode::BAD_REQUEST,
862 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
863 )
864 .into_response();
865 }
866 if Utc::now() > expires_at {
867 let _ = sqlx::query!(
868 "DELETE FROM account_deletion_requests WHERE token = $1",
869 token
870 )
871 .execute(&state.db)
872 .await;
873 return (
874 StatusCode::BAD_REQUEST,
875 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
876 )
877 .into_response();
878 }
879 let mut tx = match state.db.begin().await {
880 Ok(tx) => tx,
881 Err(e) => {
882 error!("Failed to begin transaction: {:?}", e);
883 return (
884 StatusCode::INTERNAL_SERVER_ERROR,
885 Json(json!({"error": "InternalError"})),
886 )
887 .into_response();
888 }
889 };
890 let deletion_result: Result<(), sqlx::Error> = async {
891 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
892 .execute(&mut *tx)
893 .await?;
894 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
895 .execute(&mut *tx)
896 .await?;
897 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
898 .execute(&mut *tx)
899 .await?;
900 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
901 .execute(&mut *tx)
902 .await?;
903 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
904 .execute(&mut *tx)
905 .await?;
906 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
907 .execute(&mut *tx)
908 .await?;
909 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
910 .execute(&mut *tx)
911 .await?;
912 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
913 .execute(&mut *tx)
914 .await?;
915 Ok(())
916 }
917 .await;
918 match deletion_result {
919 Ok(()) => {
920 if let Err(e) = tx.commit().await {
921 error!("Failed to commit account deletion transaction: {:?}", e);
922 return (
923 StatusCode::INTERNAL_SERVER_ERROR,
924 Json(json!({"error": "InternalError"})),
925 )
926 .into_response();
927 }
928 let account_seq = crate::api::repo::record::sequence_account_event(
929 &state,
930 did,
931 false,
932 Some("deleted"),
933 )
934 .await;
935 match account_seq {
936 Ok(seq) => {
937 if let Err(e) = sqlx::query!(
938 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
939 did,
940 seq
941 )
942 .execute(&state.db)
943 .await
944 {
945 warn!(
946 "Failed to cleanup sequences for deleted account {}: {}",
947 did, e
948 );
949 }
950 }
951 Err(e) => {
952 warn!(
953 "Failed to sequence account deletion event for {}: {}",
954 did, e
955 );
956 }
957 }
958 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
959 info!("Account {} deleted successfully", did);
960 (StatusCode::OK, Json(json!({}))).into_response()
961 }
962 Err(e) => {
963 error!("DB error deleting account, rolling back: {:?}", e);
964 (
965 StatusCode::INTERNAL_SERVER_ERROR,
966 Json(json!({"error": "InternalError"})),
967 )
968 .into_response()
969 }
970 }
971}