this repo has no description
1use crate::api::EmptyResponse;
2use crate::api::error::ApiError;
3use crate::cache::Cache;
4use crate::plc::PlcClient;
5use crate::state::AppState;
6use crate::types::{Handle, PlainPassword};
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use backon::{ExponentialBuilder, Retryable};
14use bcrypt::verify;
15use chrono::{Duration, Utc};
16use cid::Cid;
17use jacquard_repo::commit::Commit;
18use jacquard_repo::storage::BlockStore;
19use k256::ecdsa::SigningKey;
20use serde::{Deserialize, Serialize};
21use std::str::FromStr;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use tracing::{error, info, warn};
25use uuid::Uuid;
26
27#[derive(Serialize)]
28#[serde(rename_all = "camelCase")]
29pub struct CheckAccountStatusOutput {
30 pub activated: bool,
31 pub valid_did: bool,
32 pub repo_commit: String,
33 pub repo_rev: String,
34 pub repo_blocks: i64,
35 pub indexed_records: i64,
36 pub private_state_values: i64,
37 pub expected_blobs: i64,
38 pub imported_blobs: i64,
39}
40
41pub async fn check_account_status(
42 State(state): State<AppState>,
43 headers: axum::http::HeaderMap,
44) -> Response {
45 let extracted = match crate::auth::extract_auth_token_from_header(
46 headers.get("Authorization").and_then(|h| h.to_str().ok()),
47 ) {
48 Some(t) => t,
49 None => return ApiError::AuthenticationRequired.into_response(),
50 };
51 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
52 let http_uri = format!(
53 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
54 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
55 );
56 let did = match crate::auth::validate_token_with_dpop(
57 &state.db,
58 &extracted.token,
59 extracted.is_dpop,
60 dpop_proof,
61 "GET",
62 &http_uri,
63 true,
64 false,
65 )
66 .await
67 {
68 Ok(user) => user.did,
69 Err(e) => return ApiError::from(e).into_response(),
70 };
71 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
72 .fetch_optional(&state.db)
73 .await
74 {
75 Ok(Some(id)) => id,
76 _ => {
77 return ApiError::InternalError(None).into_response();
78 }
79 };
80 let user_status = sqlx::query!(
81 "SELECT deactivated_at FROM users WHERE did = $1",
82 did.as_str()
83 )
84 .fetch_optional(&state.db)
85 .await;
86 let deactivated_at = match user_status {
87 Ok(Some(row)) => row.deactivated_at,
88 _ => None,
89 };
90 let repo_result = sqlx::query!(
91 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
92 user_id
93 )
94 .fetch_optional(&state.db)
95 .await;
96 let (repo_commit, repo_rev_from_db) = match repo_result {
97 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
98 _ => (String::new(), None),
99 };
100 let block_count: i64 = sqlx::query_scalar!(
101 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1",
102 user_id
103 )
104 .fetch_one(&state.db)
105 .await
106 .unwrap_or(Some(0))
107 .unwrap_or(0);
108 let repo_rev = if let Some(rev) = repo_rev_from_db {
109 rev
110 } else if !repo_commit.is_empty() {
111 if let Ok(cid) = Cid::from_str(&repo_commit) {
112 if let Ok(Some(block)) = state.block_store.get(&cid).await {
113 Commit::from_cbor(&block)
114 .ok()
115 .map(|c| c.rev().to_string())
116 .unwrap_or_default()
117 } else {
118 String::new()
119 }
120 } else {
121 String::new()
122 }
123 } else {
124 String::new()
125 };
126 let record_count: i64 =
127 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
128 .fetch_one(&state.db)
129 .await
130 .unwrap_or(Some(0))
131 .unwrap_or(0);
132 let imported_blobs: i64 = sqlx::query_scalar!(
133 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
134 user_id
135 )
136 .fetch_one(&state.db)
137 .await
138 .unwrap_or(Some(0))
139 .unwrap_or(0);
140 let expected_blobs: i64 = sqlx::query_scalar!(
141 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
142 user_id
143 )
144 .fetch_one(&state.db)
145 .await
146 .unwrap_or(Some(0))
147 .unwrap_or(0);
148 let valid_did = is_valid_did_for_service(&state.db, state.cache.clone(), did.as_str()).await;
149 (
150 StatusCode::OK,
151 Json(CheckAccountStatusOutput {
152 activated: deactivated_at.is_none(),
153 valid_did,
154 repo_commit: repo_commit.clone(),
155 repo_rev,
156 repo_blocks: block_count as i64,
157 indexed_records: record_count,
158 private_state_values: 0,
159 expected_blobs,
160 imported_blobs,
161 }),
162 )
163 .into_response()
164}
165
166async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: Arc<dyn Cache>, did: &str) -> bool {
167 assert_valid_did_document_for_service(db, cache, did, false)
168 .await
169 .is_ok()
170}
171
172async fn assert_valid_did_document_for_service(
173 db: &sqlx::PgPool,
174 cache: Arc<dyn Cache>,
175 did: &str,
176 with_retry: bool,
177) -> Result<(), ApiError> {
178 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
179 let expected_endpoint = format!("https://{}", hostname);
180
181 if did.starts_with("did:plc:") {
182 let max_attempts = if with_retry { 5 } else { 1 };
183 let cache_for_retry = cache.clone();
184 let did_owned = did.to_string();
185 let expected_owned = expected_endpoint.clone();
186 let attempt_counter = Arc::new(AtomicUsize::new(0));
187
188 let doc_data: serde_json::Value = (|| {
189 let cache_ref = cache_for_retry.clone();
190 let did_ref = did_owned.clone();
191 let expected_ref = expected_owned.clone();
192 let counter = attempt_counter.clone();
193 async move {
194 let attempt = counter.fetch_add(1, Ordering::SeqCst);
195 if attempt > 0 {
196 info!(
197 "Retry {} for DID document validation ({})",
198 attempt, did_ref
199 );
200 }
201 let plc_client = PlcClient::with_cache(None, Some(cache_ref));
202 match plc_client.get_document_data(&did_ref).await {
203 Ok(data) => {
204 let pds_endpoint = data
205 .get("services")
206 .and_then(|s: &serde_json::Value| {
207 s.get("atproto_pds").or_else(|| s.get("atprotoPds"))
208 })
209 .and_then(|p: &serde_json::Value| p.get("endpoint"))
210 .and_then(|e: &serde_json::Value| e.as_str());
211
212 if pds_endpoint == Some(expected_ref.as_str()) {
213 Ok(data)
214 } else {
215 info!(
216 "Attempt {}: DID {} has endpoint {:?}, expected {}",
217 attempt + 1,
218 did_ref,
219 pds_endpoint,
220 expected_ref
221 );
222 Err(format!(
223 "DID document endpoint {:?} does not match expected {}",
224 pds_endpoint, expected_ref
225 ))
226 }
227 }
228 Err(e) => {
229 warn!(
230 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
231 attempt + 1,
232 did_ref,
233 e
234 );
235 Err(format!("Could not resolve DID document: {}", e))
236 }
237 }
238 }
239 })
240 .retry(
241 ExponentialBuilder::default()
242 .with_min_delay(std::time::Duration::from_millis(500))
243 .with_max_times(max_attempts),
244 )
245 .await
246 .map_err(ApiError::InvalidRequest)?;
247
248 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
249 if let Some(ref expected_rotation_key) = server_rotation_key {
250 let rotation_keys = doc_data
251 .get("rotationKeys")
252 .and_then(|v| v.as_array())
253 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>())
254 .unwrap_or_default();
255 if !rotation_keys.contains(&expected_rotation_key.as_str()) {
256 return Err(ApiError::InvalidRequest(
257 "Server rotation key not included in PLC DID data".into(),
258 ));
259 }
260 }
261
262 let doc_signing_key = doc_data
263 .get("verificationMethods")
264 .and_then(|v| v.get("atproto"))
265 .and_then(|k| k.as_str());
266
267 let user_row = sqlx::query!(
268 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
269 did
270 )
271 .fetch_optional(db)
272 .await
273 .map_err(|e| {
274 error!("Failed to fetch user key: {:?}", e);
275 ApiError::InternalError(None)
276 })?;
277
278 if let Some(row) = user_row {
279 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
280 .map_err(|e| {
281 error!("Failed to decrypt user key: {}", e);
282 ApiError::InternalError(None)
283 })?;
284 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
285 error!("Failed to create signing key: {:?}", e);
286 ApiError::InternalError(None)
287 })?;
288 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
289
290 if doc_signing_key != Some(&expected_did_key) {
291 warn!(
292 "DID {} has signing key {:?}, expected {}",
293 did, doc_signing_key, expected_did_key
294 );
295 return Err(ApiError::InvalidRequest(
296 "DID document verification method does not match expected signing key".into(),
297 ));
298 }
299 }
300 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
301 let client = crate::api::proxy_client::did_resolution_client();
302 let decoded = host_and_path.replace("%3A", ":");
303 let parts: Vec<&str> = decoded.split(':').collect();
304 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
305 {
306 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
307 } else {
308 (parts[0].to_string(), parts[1..].to_vec())
309 };
310 let scheme =
311 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
312 "http"
313 } else {
314 "https"
315 };
316 let url = if path_parts.is_empty() {
317 format!("{}://{}/.well-known/did.json", scheme, host)
318 } else {
319 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
320 };
321 let resp = client.get(&url).send().await.map_err(|e| {
322 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
323 ApiError::InvalidRequest(format!("Could not resolve DID document: {}", e))
324 })?;
325 let doc: serde_json::Value = resp.json().await.map_err(|e| {
326 warn!("Failed to parse did:web document for {}: {:?}", did, e);
327 ApiError::InvalidRequest(format!("Could not parse DID document: {}", e))
328 })?;
329
330 let pds_endpoint = doc
331 .get("service")
332 .and_then(|s| s.as_array())
333 .and_then(|arr| {
334 arr.iter().find(|svc| {
335 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
336 || svc.get("type").and_then(|t| t.as_str())
337 == Some("AtprotoPersonalDataServer")
338 })
339 })
340 .and_then(|svc| svc.get("serviceEndpoint"))
341 .and_then(|e| e.as_str());
342
343 if pds_endpoint != Some(&expected_endpoint) {
344 warn!(
345 "DID {} has endpoint {:?}, expected {}",
346 did, pds_endpoint, expected_endpoint
347 );
348 return Err(ApiError::InvalidRequest(
349 "DID document atproto_pds service endpoint does not match PDS public url".into(),
350 ));
351 }
352 }
353
354 Ok(())
355}
356
357pub async fn activate_account(
358 State(state): State<AppState>,
359 headers: axum::http::HeaderMap,
360) -> Response {
361 info!("[MIGRATION] activateAccount called");
362 let extracted = match crate::auth::extract_auth_token_from_header(
363 headers.get("Authorization").and_then(|h| h.to_str().ok()),
364 ) {
365 Some(t) => t,
366 None => {
367 info!("[MIGRATION] activateAccount: No auth token");
368 return ApiError::AuthenticationRequired.into_response();
369 }
370 };
371 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
372 let http_uri = format!(
373 "https://{}/xrpc/com.atproto.server.activateAccount",
374 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
375 );
376 let auth_user = match crate::auth::validate_token_with_dpop(
377 &state.db,
378 &extracted.token,
379 extracted.is_dpop,
380 dpop_proof,
381 "POST",
382 &http_uri,
383 true,
384 false,
385 )
386 .await
387 {
388 Ok(user) => user,
389 Err(e) => {
390 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
391 return ApiError::from(e).into_response();
392 }
393 };
394 info!(
395 "[MIGRATION] activateAccount: Authenticated user did={}",
396 auth_user.did
397 );
398
399 if let Err(e) = crate::auth::scope_check::check_account_scope(
400 auth_user.is_oauth,
401 auth_user.scope.as_deref(),
402 crate::oauth::scopes::AccountAttr::Repo,
403 crate::oauth::scopes::AccountAction::Manage,
404 ) {
405 info!("[MIGRATION] activateAccount: Scope check failed");
406 return e;
407 }
408
409 let did = auth_user.did;
410
411 info!(
412 "[MIGRATION] activateAccount: Validating DID document for did={}",
413 did
414 );
415 let did_validation_start = std::time::Instant::now();
416 if let Err(e) =
417 assert_valid_did_document_for_service(&state.db, state.cache.clone(), did.as_str(), true)
418 .await
419 {
420 info!(
421 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
422 did,
423 did_validation_start.elapsed()
424 );
425 return e.into_response();
426 }
427 info!(
428 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
429 did,
430 did_validation_start.elapsed()
431 );
432
433 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
434 .fetch_optional(&state.db)
435 .await
436 .ok()
437 .flatten();
438 info!(
439 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
440 did, handle
441 );
442 let result = sqlx::query!(
443 "UPDATE users SET deactivated_at = NULL WHERE did = $1",
444 did.as_str()
445 )
446 .execute(&state.db)
447 .await;
448 match result {
449 Ok(_) => {
450 info!(
451 "[MIGRATION] activateAccount: DB update success for did={}",
452 did
453 );
454 if let Some(ref h) = handle {
455 let _ = state.cache.delete(&format!("handle:{}", h)).await;
456 }
457 info!(
458 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
459 did
460 );
461 if let Err(e) =
462 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
463 {
464 warn!(
465 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
466 e
467 );
468 } else {
469 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
470 }
471 info!(
472 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
473 did, handle
474 );
475 let handle_typed = handle.as_ref().map(Handle::new_unchecked);
476 if let Err(e) = crate::api::repo::record::sequence_identity_event(
477 &state,
478 &did,
479 handle_typed.as_ref(),
480 )
481 .await
482 {
483 warn!(
484 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
485 e
486 );
487 } else {
488 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
489 }
490 let repo_root = sqlx::query_scalar!(
491 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
492 did.as_str()
493 )
494 .fetch_optional(&state.db)
495 .await
496 .ok()
497 .flatten();
498 if let Some(root_cid) = repo_root {
499 info!(
500 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
501 did, root_cid
502 );
503 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
504 if let Ok(Some(block)) = state.block_store.get(&cid).await {
505 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
506 } else {
507 None
508 }
509 } else {
510 None
511 };
512 if let Err(e) = crate::api::repo::record::sequence_sync_event(
513 &state,
514 &did,
515 &root_cid,
516 rev.as_deref(),
517 )
518 .await
519 {
520 warn!(
521 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
522 e
523 );
524 } else {
525 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
526 }
527 } else {
528 warn!(
529 "[MIGRATION] activateAccount: No repo root found for did={}",
530 did
531 );
532 }
533 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
534 EmptyResponse::ok().into_response()
535 }
536 Err(e) => {
537 error!(
538 "[MIGRATION] activateAccount: DB error activating account: {:?}",
539 e
540 );
541 ApiError::InternalError(None).into_response()
542 }
543 }
544}
545
546#[derive(Deserialize)]
547#[serde(rename_all = "camelCase")]
548pub struct DeactivateAccountInput {
549 pub delete_after: Option<String>,
550}
551
552pub async fn deactivate_account(
553 State(state): State<AppState>,
554 headers: axum::http::HeaderMap,
555 Json(input): Json<DeactivateAccountInput>,
556) -> Response {
557 let extracted = match crate::auth::extract_auth_token_from_header(
558 headers.get("Authorization").and_then(|h| h.to_str().ok()),
559 ) {
560 Some(t) => t,
561 None => return ApiError::AuthenticationRequired.into_response(),
562 };
563 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
564 let http_uri = format!(
565 "https://{}/xrpc/com.atproto.server.deactivateAccount",
566 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
567 );
568 let auth_user = match crate::auth::validate_token_with_dpop(
569 &state.db,
570 &extracted.token,
571 extracted.is_dpop,
572 dpop_proof,
573 "POST",
574 &http_uri,
575 false,
576 false,
577 )
578 .await
579 {
580 Ok(user) => user,
581 Err(e) => return ApiError::from(e).into_response(),
582 };
583
584 if let Err(e) = crate::auth::scope_check::check_account_scope(
585 auth_user.is_oauth,
586 auth_user.scope.as_deref(),
587 crate::oauth::scopes::AccountAttr::Repo,
588 crate::oauth::scopes::AccountAction::Manage,
589 ) {
590 return e;
591 }
592
593 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
594 .delete_after
595 .as_ref()
596 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
597 .map(|dt| dt.with_timezone(&chrono::Utc));
598
599 let did = auth_user.did;
600
601 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
602 .fetch_optional(&state.db)
603 .await
604 .ok()
605 .flatten();
606
607 let result = sqlx::query!(
608 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
609 did.as_str(),
610 delete_after
611 )
612 .execute(&state.db)
613 .await;
614
615 match result {
616 Ok(_) => {
617 if let Some(ref h) = handle {
618 let _ = state.cache.delete(&format!("handle:{}", h)).await;
619 }
620 if let Err(e) = crate::api::repo::record::sequence_account_event(
621 &state,
622 &did,
623 false,
624 Some("deactivated"),
625 )
626 .await
627 {
628 warn!("Failed to sequence account deactivated event: {}", e);
629 }
630 EmptyResponse::ok().into_response()
631 }
632 Err(e) => {
633 error!("DB error deactivating account: {:?}", e);
634 ApiError::InternalError(None).into_response()
635 }
636 }
637}
638
639pub async fn request_account_delete(
640 State(state): State<AppState>,
641 headers: axum::http::HeaderMap,
642) -> Response {
643 let extracted = match crate::auth::extract_auth_token_from_header(
644 headers.get("Authorization").and_then(|h| h.to_str().ok()),
645 ) {
646 Some(t) => t,
647 None => return ApiError::AuthenticationRequired.into_response(),
648 };
649 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
650 let http_uri = format!(
651 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
652 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
653 );
654 let validated = match crate::auth::validate_token_with_dpop(
655 &state.db,
656 &extracted.token,
657 extracted.is_dpop,
658 dpop_proof,
659 "POST",
660 &http_uri,
661 true,
662 false,
663 )
664 .await
665 {
666 Ok(user) => user,
667 Err(e) => return ApiError::from(e).into_response(),
668 };
669 let did = validated.did.clone();
670
671 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, did.as_str()).await {
672 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, did.as_str())
673 .await;
674 }
675
676 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
677 .fetch_optional(&state.db)
678 .await
679 {
680 Ok(Some(id)) => id,
681 _ => {
682 return ApiError::InternalError(None).into_response();
683 }
684 };
685 let confirmation_token = Uuid::new_v4().to_string();
686 let expires_at = Utc::now() + Duration::minutes(15);
687 let insert = sqlx::query!(
688 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
689 confirmation_token,
690 did.as_str(),
691 expires_at
692 )
693 .execute(&state.db)
694 .await;
695 if let Err(e) = insert {
696 error!("DB error creating deletion token: {:?}", e);
697 return ApiError::InternalError(None).into_response();
698 }
699 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
700 if let Err(e) =
701 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
702 .await
703 {
704 warn!("Failed to enqueue account deletion notification: {:?}", e);
705 }
706 info!("Account deletion requested for user {}", did);
707 EmptyResponse::ok().into_response()
708}
709
710#[derive(Deserialize)]
711pub struct DeleteAccountInput {
712 pub did: crate::types::Did,
713 pub password: PlainPassword,
714 pub token: String,
715}
716
717pub async fn delete_account(
718 State(state): State<AppState>,
719 Json(input): Json<DeleteAccountInput>,
720) -> Response {
721 let did = &input.did;
722 let password = &input.password;
723 let token = input.token.trim();
724 if password.is_empty() {
725 return ApiError::InvalidRequest("password is required".into()).into_response();
726 }
727 const OLD_PASSWORD_MAX_LENGTH: usize = 512;
728 if password.len() > OLD_PASSWORD_MAX_LENGTH {
729 return ApiError::InvalidRequest("Invalid password length".into()).into_response();
730 }
731 if token.is_empty() {
732 return ApiError::InvalidToken(Some("token is required".into())).into_response();
733 }
734 let user = sqlx::query!(
735 "SELECT id, password_hash, handle FROM users WHERE did = $1",
736 did.as_str()
737 )
738 .fetch_optional(&state.db)
739 .await;
740 let (user_id, password_hash, handle) = match user {
741 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
742 Ok(None) => {
743 return ApiError::InvalidRequest("account not found".into()).into_response();
744 }
745 Err(e) => {
746 error!("DB error in delete_account: {:?}", e);
747 return ApiError::InternalError(None).into_response();
748 }
749 };
750 let password_valid = if password_hash
751 .as_ref()
752 .map(|h| verify(password, h).unwrap_or(false))
753 .unwrap_or(false)
754 {
755 true
756 } else {
757 let app_pass_rows = sqlx::query!(
758 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
759 user_id
760 )
761 .fetch_all(&state.db)
762 .await
763 .unwrap_or_default();
764 app_pass_rows
765 .iter()
766 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
767 };
768 if !password_valid {
769 return ApiError::AuthenticationFailed(Some("Invalid password".into())).into_response();
770 }
771 let deletion_request = sqlx::query!(
772 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
773 token
774 )
775 .fetch_optional(&state.db)
776 .await;
777 let (token_did, expires_at) = match deletion_request {
778 Ok(Some(row)) => (row.did, row.expires_at),
779 Ok(None) => {
780 return ApiError::InvalidToken(Some("Invalid or expired token".into())).into_response();
781 }
782 Err(e) => {
783 error!("DB error fetching deletion token: {:?}", e);
784 return ApiError::InternalError(None).into_response();
785 }
786 };
787 if token_did != did.as_str() {
788 return ApiError::InvalidToken(Some("Token does not match account".into())).into_response();
789 }
790 if Utc::now() > expires_at {
791 let _ = sqlx::query!(
792 "DELETE FROM account_deletion_requests WHERE token = $1",
793 token
794 )
795 .execute(&state.db)
796 .await;
797 return ApiError::ExpiredToken(None).into_response();
798 }
799 let mut tx = match state.db.begin().await {
800 Ok(tx) => tx,
801 Err(e) => {
802 error!("Failed to begin transaction: {:?}", e);
803 return ApiError::InternalError(None).into_response();
804 }
805 };
806 let deletion_result: Result<(), sqlx::Error> = async {
807 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
808 .execute(&mut *tx)
809 .await?;
810 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
811 .execute(&mut *tx)
812 .await?;
813 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
814 .execute(&mut *tx)
815 .await?;
816 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
817 .execute(&mut *tx)
818 .await?;
819 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
820 .execute(&mut *tx)
821 .await?;
822 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
823 .execute(&mut *tx)
824 .await?;
825 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
826 .execute(&mut *tx)
827 .await?;
828 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
829 .execute(&mut *tx)
830 .await?;
831 Ok(())
832 }
833 .await;
834 match deletion_result {
835 Ok(()) => {
836 if let Err(e) = tx.commit().await {
837 error!("Failed to commit account deletion transaction: {:?}", e);
838 return ApiError::InternalError(None).into_response();
839 }
840 let account_seq = crate::api::repo::record::sequence_account_event(
841 &state,
842 did,
843 false,
844 Some("deleted"),
845 )
846 .await;
847 match account_seq {
848 Ok(seq) => {
849 if let Err(e) = sqlx::query!(
850 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
851 did,
852 seq
853 )
854 .execute(&state.db)
855 .await
856 {
857 warn!(
858 "Failed to cleanup sequences for deleted account {}: {}",
859 did, e
860 );
861 }
862 }
863 Err(e) => {
864 warn!(
865 "Failed to sequence account deletion event for {}: {}",
866 did, e
867 );
868 }
869 }
870 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
871 info!("Account {} deleted successfully", did);
872 EmptyResponse::ok().into_response()
873 }
874 Err(e) => {
875 error!("DB error deleting account, rolling back: {:?}", e);
876 ApiError::InternalError(None).into_response()
877 }
878 }
879}