this repo has no description
1use crate::api::EmptyResponse;
2use crate::api::error::ApiError;
3use crate::cache::Cache;
4use crate::plc::PlcClient;
5use crate::state::AppState;
6use crate::types::PlainPassword;
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use bcrypt::verify;
14use chrono::{Duration, Utc};
15use cid::Cid;
16use jacquard_repo::commit::Commit;
17use jacquard_repo::storage::BlockStore;
18use k256::ecdsa::SigningKey;
19use serde::{Deserialize, Serialize};
20use std::str::FromStr;
21use std::sync::Arc;
22use tracing::{error, info, warn};
23use uuid::Uuid;
24
25#[derive(Serialize)]
26#[serde(rename_all = "camelCase")]
27pub struct CheckAccountStatusOutput {
28 pub activated: bool,
29 pub valid_did: bool,
30 pub repo_commit: String,
31 pub repo_rev: String,
32 pub repo_blocks: i64,
33 pub indexed_records: i64,
34 pub private_state_values: i64,
35 pub expected_blobs: i64,
36 pub imported_blobs: i64,
37}
38
39pub async fn check_account_status(
40 State(state): State<AppState>,
41 headers: axum::http::HeaderMap,
42) -> Response {
43 let extracted = match crate::auth::extract_auth_token_from_header(
44 headers.get("Authorization").and_then(|h| h.to_str().ok()),
45 ) {
46 Some(t) => t,
47 None => return ApiError::AuthenticationRequired.into_response(),
48 };
49 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
50 let http_uri = format!(
51 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
52 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
53 );
54 let did = match crate::auth::validate_token_with_dpop(
55 &state.db,
56 &extracted.token,
57 extracted.is_dpop,
58 dpop_proof,
59 "GET",
60 &http_uri,
61 true,
62 )
63 .await
64 {
65 Ok(user) => user.did,
66 Err(e) => return ApiError::from(e).into_response(),
67 };
68 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
69 .fetch_optional(&state.db)
70 .await
71 {
72 Ok(Some(id)) => id,
73 _ => {
74 return ApiError::InternalError(None).into_response();
75 }
76 };
77 let user_status = sqlx::query!(
78 "SELECT deactivated_at FROM users WHERE did = $1",
79 did.as_str()
80 )
81 .fetch_optional(&state.db)
82 .await;
83 let deactivated_at = match user_status {
84 Ok(Some(row)) => row.deactivated_at,
85 _ => None,
86 };
87 let repo_result = sqlx::query!(
88 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
89 user_id
90 )
91 .fetch_optional(&state.db)
92 .await;
93 let (repo_commit, repo_rev_from_db) = match repo_result {
94 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
95 _ => (String::new(), None),
96 };
97 let block_count: i64 = sqlx::query_scalar!(
98 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1",
99 user_id
100 )
101 .fetch_one(&state.db)
102 .await
103 .unwrap_or(Some(0))
104 .unwrap_or(0);
105 let repo_rev = if let Some(rev) = repo_rev_from_db {
106 rev
107 } else if !repo_commit.is_empty() {
108 if let Ok(cid) = Cid::from_str(&repo_commit) {
109 if let Ok(Some(block)) = state.block_store.get(&cid).await {
110 Commit::from_cbor(&block)
111 .ok()
112 .map(|c| c.rev().to_string())
113 .unwrap_or_default()
114 } else {
115 String::new()
116 }
117 } else {
118 String::new()
119 }
120 } else {
121 String::new()
122 };
123 let record_count: i64 =
124 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
125 .fetch_one(&state.db)
126 .await
127 .unwrap_or(Some(0))
128 .unwrap_or(0);
129 let imported_blobs: i64 = sqlx::query_scalar!(
130 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
131 user_id
132 )
133 .fetch_one(&state.db)
134 .await
135 .unwrap_or(Some(0))
136 .unwrap_or(0);
137 let expected_blobs: i64 = sqlx::query_scalar!(
138 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
139 user_id
140 )
141 .fetch_one(&state.db)
142 .await
143 .unwrap_or(Some(0))
144 .unwrap_or(0);
145 let valid_did = is_valid_did_for_service(&state.db, state.cache.clone(), did.as_str()).await;
146 (
147 StatusCode::OK,
148 Json(CheckAccountStatusOutput {
149 activated: deactivated_at.is_none(),
150 valid_did,
151 repo_commit: repo_commit.clone(),
152 repo_rev,
153 repo_blocks: block_count as i64,
154 indexed_records: record_count,
155 private_state_values: 0,
156 expected_blobs,
157 imported_blobs,
158 }),
159 )
160 .into_response()
161}
162
163async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: Arc<dyn Cache>, did: &str) -> bool {
164 assert_valid_did_document_for_service(db, cache, did, false)
165 .await
166 .is_ok()
167}
168
169async fn assert_valid_did_document_for_service(
170 db: &sqlx::PgPool,
171 cache: Arc<dyn Cache>,
172 did: &str,
173 with_retry: bool,
174) -> Result<(), ApiError> {
175 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
176 let expected_endpoint = format!("https://{}", hostname);
177
178 if did.starts_with("did:plc:") {
179 let plc_client = PlcClient::with_cache(None, Some(cache.clone()));
180
181 let max_attempts = if with_retry { 5 } else { 1 };
182 let mut last_error = None;
183 let mut doc_data = None;
184 for attempt in 0..max_attempts {
185 if attempt > 0 {
186 let delay_ms = 500 * (1 << (attempt - 1));
187 info!(
188 "Waiting {}ms before retry {} for DID document validation ({})",
189 delay_ms, attempt, did
190 );
191 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
192 }
193
194 match plc_client.get_document_data(did).await {
195 Ok(data) => {
196 let pds_endpoint = data
197 .get("services")
198 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
199 .and_then(|p| p.get("endpoint"))
200 .and_then(|e| e.as_str());
201
202 if pds_endpoint == Some(&expected_endpoint) {
203 doc_data = Some(data);
204 break;
205 } else {
206 info!(
207 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying",
208 attempt + 1,
209 did,
210 pds_endpoint,
211 expected_endpoint
212 );
213 last_error = Some(format!(
214 "DID document endpoint {:?} does not match expected {}",
215 pds_endpoint, expected_endpoint
216 ));
217 }
218 }
219 Err(e) => {
220 warn!(
221 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
222 attempt + 1,
223 did,
224 e
225 );
226 last_error = Some(format!("Could not resolve DID document: {}", e));
227 }
228 }
229 }
230
231 let Some(doc_data) = doc_data else {
232 return Err(ApiError::InvalidRequest(
233 last_error.unwrap_or_else(|| "DID document validation failed".to_string()),
234 ));
235 };
236
237 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
238 if let Some(ref expected_rotation_key) = server_rotation_key {
239 let rotation_keys = doc_data
240 .get("rotationKeys")
241 .and_then(|v| v.as_array())
242 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>())
243 .unwrap_or_default();
244 if !rotation_keys.contains(&expected_rotation_key.as_str()) {
245 return Err(ApiError::InvalidRequest(
246 "Server rotation key not included in PLC DID data".into(),
247 ));
248 }
249 }
250
251 let doc_signing_key = doc_data
252 .get("verificationMethods")
253 .and_then(|v| v.get("atproto"))
254 .and_then(|k| k.as_str());
255
256 let user_row = sqlx::query!(
257 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
258 did
259 )
260 .fetch_optional(db)
261 .await
262 .map_err(|e| {
263 error!("Failed to fetch user key: {:?}", e);
264 ApiError::InternalError(None)
265 })?;
266
267 if let Some(row) = user_row {
268 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
269 .map_err(|e| {
270 error!("Failed to decrypt user key: {}", e);
271 ApiError::InternalError(None)
272 })?;
273 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
274 error!("Failed to create signing key: {:?}", e);
275 ApiError::InternalError(None)
276 })?;
277 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
278
279 if doc_signing_key != Some(&expected_did_key) {
280 warn!(
281 "DID {} has signing key {:?}, expected {}",
282 did, doc_signing_key, expected_did_key
283 );
284 return Err(ApiError::InvalidRequest(
285 "DID document verification method does not match expected signing key".into(),
286 ));
287 }
288 }
289 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
290 let client = crate::api::proxy_client::did_resolution_client();
291 let decoded = host_and_path.replace("%3A", ":");
292 let parts: Vec<&str> = decoded.split(':').collect();
293 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
294 {
295 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
296 } else {
297 (parts[0].to_string(), parts[1..].to_vec())
298 };
299 let scheme =
300 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
301 "http"
302 } else {
303 "https"
304 };
305 let url = if path_parts.is_empty() {
306 format!("{}://{}/.well-known/did.json", scheme, host)
307 } else {
308 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
309 };
310 let resp = client.get(&url).send().await.map_err(|e| {
311 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
312 ApiError::InvalidRequest(format!("Could not resolve DID document: {}", e))
313 })?;
314 let doc: serde_json::Value = resp.json().await.map_err(|e| {
315 warn!("Failed to parse did:web document for {}: {:?}", did, e);
316 ApiError::InvalidRequest(format!("Could not parse DID document: {}", e))
317 })?;
318
319 let pds_endpoint = doc
320 .get("service")
321 .and_then(|s| s.as_array())
322 .and_then(|arr| {
323 arr.iter().find(|svc| {
324 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
325 || svc.get("type").and_then(|t| t.as_str())
326 == Some("AtprotoPersonalDataServer")
327 })
328 })
329 .and_then(|svc| svc.get("serviceEndpoint"))
330 .and_then(|e| e.as_str());
331
332 if pds_endpoint != Some(&expected_endpoint) {
333 warn!(
334 "DID {} has endpoint {:?}, expected {}",
335 did, pds_endpoint, expected_endpoint
336 );
337 return Err(ApiError::InvalidRequest(
338 "DID document atproto_pds service endpoint does not match PDS public url".into(),
339 ));
340 }
341 }
342
343 Ok(())
344}
345
346pub async fn activate_account(
347 State(state): State<AppState>,
348 headers: axum::http::HeaderMap,
349) -> Response {
350 info!("[MIGRATION] activateAccount called");
351 let extracted = match crate::auth::extract_auth_token_from_header(
352 headers.get("Authorization").and_then(|h| h.to_str().ok()),
353 ) {
354 Some(t) => t,
355 None => {
356 info!("[MIGRATION] activateAccount: No auth token");
357 return ApiError::AuthenticationRequired.into_response();
358 }
359 };
360 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
361 let http_uri = format!(
362 "https://{}/xrpc/com.atproto.server.activateAccount",
363 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
364 );
365 let auth_user = match crate::auth::validate_token_with_dpop(
366 &state.db,
367 &extracted.token,
368 extracted.is_dpop,
369 dpop_proof,
370 "POST",
371 &http_uri,
372 true,
373 )
374 .await
375 {
376 Ok(user) => user,
377 Err(e) => {
378 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
379 return ApiError::from(e).into_response();
380 }
381 };
382 info!(
383 "[MIGRATION] activateAccount: Authenticated user did={}",
384 auth_user.did
385 );
386
387 if let Err(e) = crate::auth::scope_check::check_account_scope(
388 auth_user.is_oauth,
389 auth_user.scope.as_deref(),
390 crate::oauth::scopes::AccountAttr::Repo,
391 crate::oauth::scopes::AccountAction::Manage,
392 ) {
393 info!("[MIGRATION] activateAccount: Scope check failed");
394 return e;
395 }
396
397 let did = auth_user.did;
398
399 info!(
400 "[MIGRATION] activateAccount: Validating DID document for did={}",
401 did
402 );
403 let did_validation_start = std::time::Instant::now();
404 if let Err(e) =
405 assert_valid_did_document_for_service(&state.db, state.cache.clone(), did.as_str(), true)
406 .await
407 {
408 info!(
409 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
410 did,
411 did_validation_start.elapsed()
412 );
413 return e.into_response();
414 }
415 info!(
416 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
417 did,
418 did_validation_start.elapsed()
419 );
420
421 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
422 .fetch_optional(&state.db)
423 .await
424 .ok()
425 .flatten();
426 info!(
427 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
428 did, handle
429 );
430 let result = sqlx::query!(
431 "UPDATE users SET deactivated_at = NULL WHERE did = $1",
432 did.as_str()
433 )
434 .execute(&state.db)
435 .await;
436 match result {
437 Ok(_) => {
438 info!(
439 "[MIGRATION] activateAccount: DB update success for did={}",
440 did
441 );
442 if let Some(ref h) = handle {
443 let _ = state.cache.delete(&format!("handle:{}", h)).await;
444 }
445 info!(
446 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
447 did
448 );
449 if let Err(e) =
450 crate::api::repo::record::sequence_account_event(&state, did.as_str(), true, None)
451 .await
452 {
453 warn!(
454 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
455 e
456 );
457 } else {
458 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
459 }
460 info!(
461 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
462 did, handle
463 );
464 if let Err(e) = crate::api::repo::record::sequence_identity_event(
465 &state,
466 did.as_str(),
467 handle.as_deref(),
468 )
469 .await
470 {
471 warn!(
472 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
473 e
474 );
475 } else {
476 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
477 }
478 let repo_root = sqlx::query_scalar!(
479 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
480 did.as_str()
481 )
482 .fetch_optional(&state.db)
483 .await
484 .ok()
485 .flatten();
486 if let Some(root_cid) = repo_root {
487 info!(
488 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
489 did, root_cid
490 );
491 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
492 if let Ok(Some(block)) = state.block_store.get(&cid).await {
493 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
494 } else {
495 None
496 }
497 } else {
498 None
499 };
500 if let Err(e) = crate::api::repo::record::sequence_sync_event(
501 &state,
502 did.as_str(),
503 &root_cid,
504 rev.as_deref(),
505 )
506 .await
507 {
508 warn!(
509 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
510 e
511 );
512 } else {
513 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
514 }
515 } else {
516 warn!(
517 "[MIGRATION] activateAccount: No repo root found for did={}",
518 did
519 );
520 }
521 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
522 EmptyResponse::ok().into_response()
523 }
524 Err(e) => {
525 error!(
526 "[MIGRATION] activateAccount: DB error activating account: {:?}",
527 e
528 );
529 ApiError::InternalError(None).into_response()
530 }
531 }
532}
533
534#[derive(Deserialize)]
535#[serde(rename_all = "camelCase")]
536pub struct DeactivateAccountInput {
537 pub delete_after: Option<String>,
538}
539
540pub async fn deactivate_account(
541 State(state): State<AppState>,
542 headers: axum::http::HeaderMap,
543 Json(input): Json<DeactivateAccountInput>,
544) -> Response {
545 let extracted = match crate::auth::extract_auth_token_from_header(
546 headers.get("Authorization").and_then(|h| h.to_str().ok()),
547 ) {
548 Some(t) => t,
549 None => return ApiError::AuthenticationRequired.into_response(),
550 };
551 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
552 let http_uri = format!(
553 "https://{}/xrpc/com.atproto.server.deactivateAccount",
554 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
555 );
556 let auth_user = match crate::auth::validate_token_with_dpop(
557 &state.db,
558 &extracted.token,
559 extracted.is_dpop,
560 dpop_proof,
561 "POST",
562 &http_uri,
563 false,
564 )
565 .await
566 {
567 Ok(user) => user,
568 Err(e) => return ApiError::from(e).into_response(),
569 };
570
571 if let Err(e) = crate::auth::scope_check::check_account_scope(
572 auth_user.is_oauth,
573 auth_user.scope.as_deref(),
574 crate::oauth::scopes::AccountAttr::Repo,
575 crate::oauth::scopes::AccountAction::Manage,
576 ) {
577 return e;
578 }
579
580 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
581 .delete_after
582 .as_ref()
583 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
584 .map(|dt| dt.with_timezone(&chrono::Utc));
585
586 let did = auth_user.did;
587
588 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
589 .fetch_optional(&state.db)
590 .await
591 .ok()
592 .flatten();
593
594 let result = sqlx::query!(
595 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
596 did.as_str(),
597 delete_after
598 )
599 .execute(&state.db)
600 .await;
601
602 match result {
603 Ok(_) => {
604 if let Some(ref h) = handle {
605 let _ = state.cache.delete(&format!("handle:{}", h)).await;
606 }
607 if let Err(e) = crate::api::repo::record::sequence_account_event(
608 &state,
609 did.as_str(),
610 false,
611 Some("deactivated"),
612 )
613 .await
614 {
615 warn!("Failed to sequence account deactivated event: {}", e);
616 }
617 EmptyResponse::ok().into_response()
618 }
619 Err(e) => {
620 error!("DB error deactivating account: {:?}", e);
621 ApiError::InternalError(None).into_response()
622 }
623 }
624}
625
626pub async fn request_account_delete(
627 State(state): State<AppState>,
628 headers: axum::http::HeaderMap,
629) -> Response {
630 let extracted = match crate::auth::extract_auth_token_from_header(
631 headers.get("Authorization").and_then(|h| h.to_str().ok()),
632 ) {
633 Some(t) => t,
634 None => return ApiError::AuthenticationRequired.into_response(),
635 };
636 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
637 let http_uri = format!(
638 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
639 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
640 );
641 let validated = match crate::auth::validate_token_with_dpop(
642 &state.db,
643 &extracted.token,
644 extracted.is_dpop,
645 dpop_proof,
646 "POST",
647 &http_uri,
648 true,
649 )
650 .await
651 {
652 Ok(user) => user,
653 Err(e) => return ApiError::from(e).into_response(),
654 };
655 let did = validated.did.clone();
656
657 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, did.as_str()).await {
658 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, did.as_str())
659 .await;
660 }
661
662 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
663 .fetch_optional(&state.db)
664 .await
665 {
666 Ok(Some(id)) => id,
667 _ => {
668 return ApiError::InternalError(None).into_response();
669 }
670 };
671 let confirmation_token = Uuid::new_v4().to_string();
672 let expires_at = Utc::now() + Duration::minutes(15);
673 let insert = sqlx::query!(
674 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
675 confirmation_token,
676 did.as_str(),
677 expires_at
678 )
679 .execute(&state.db)
680 .await;
681 if let Err(e) = insert {
682 error!("DB error creating deletion token: {:?}", e);
683 return ApiError::InternalError(None).into_response();
684 }
685 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
686 if let Err(e) =
687 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
688 .await
689 {
690 warn!("Failed to enqueue account deletion notification: {:?}", e);
691 }
692 info!("Account deletion requested for user {}", did);
693 EmptyResponse::ok().into_response()
694}
695
696#[derive(Deserialize)]
697pub struct DeleteAccountInput {
698 pub did: crate::types::Did,
699 pub password: PlainPassword,
700 pub token: String,
701}
702
703pub async fn delete_account(
704 State(state): State<AppState>,
705 Json(input): Json<DeleteAccountInput>,
706) -> Response {
707 let did = &input.did;
708 let password = &input.password;
709 let token = input.token.trim();
710 if password.is_empty() {
711 return ApiError::InvalidRequest("password is required".into()).into_response();
712 }
713 const OLD_PASSWORD_MAX_LENGTH: usize = 512;
714 if password.len() > OLD_PASSWORD_MAX_LENGTH {
715 return ApiError::InvalidRequest("Invalid password length".into()).into_response();
716 }
717 if token.is_empty() {
718 return ApiError::InvalidToken(Some("token is required".into())).into_response();
719 }
720 let user = sqlx::query!(
721 "SELECT id, password_hash, handle FROM users WHERE did = $1",
722 did.as_str()
723 )
724 .fetch_optional(&state.db)
725 .await;
726 let (user_id, password_hash, handle) = match user {
727 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
728 Ok(None) => {
729 return ApiError::InvalidRequest("account not found".into()).into_response();
730 }
731 Err(e) => {
732 error!("DB error in delete_account: {:?}", e);
733 return ApiError::InternalError(None).into_response();
734 }
735 };
736 let password_valid = if password_hash
737 .as_ref()
738 .map(|h| verify(password, h).unwrap_or(false))
739 .unwrap_or(false)
740 {
741 true
742 } else {
743 let app_pass_rows = sqlx::query!(
744 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
745 user_id
746 )
747 .fetch_all(&state.db)
748 .await
749 .unwrap_or_default();
750 app_pass_rows
751 .iter()
752 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
753 };
754 if !password_valid {
755 return ApiError::AuthenticationFailed(Some("Invalid password".into())).into_response();
756 }
757 let deletion_request = sqlx::query!(
758 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
759 token
760 )
761 .fetch_optional(&state.db)
762 .await;
763 let (token_did, expires_at) = match deletion_request {
764 Ok(Some(row)) => (row.did, row.expires_at),
765 Ok(None) => {
766 return ApiError::InvalidToken(Some("Invalid or expired token".into())).into_response();
767 }
768 Err(e) => {
769 error!("DB error fetching deletion token: {:?}", e);
770 return ApiError::InternalError(None).into_response();
771 }
772 };
773 if token_did != did.as_str() {
774 return ApiError::InvalidToken(Some("Token does not match account".into())).into_response();
775 }
776 if Utc::now() > expires_at {
777 let _ = sqlx::query!(
778 "DELETE FROM account_deletion_requests WHERE token = $1",
779 token
780 )
781 .execute(&state.db)
782 .await;
783 return ApiError::ExpiredToken(None).into_response();
784 }
785 let mut tx = match state.db.begin().await {
786 Ok(tx) => tx,
787 Err(e) => {
788 error!("Failed to begin transaction: {:?}", e);
789 return ApiError::InternalError(None).into_response();
790 }
791 };
792 let deletion_result: Result<(), sqlx::Error> = async {
793 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
794 .execute(&mut *tx)
795 .await?;
796 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
797 .execute(&mut *tx)
798 .await?;
799 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
800 .execute(&mut *tx)
801 .await?;
802 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
803 .execute(&mut *tx)
804 .await?;
805 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
806 .execute(&mut *tx)
807 .await?;
808 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
809 .execute(&mut *tx)
810 .await?;
811 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
812 .execute(&mut *tx)
813 .await?;
814 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
815 .execute(&mut *tx)
816 .await?;
817 Ok(())
818 }
819 .await;
820 match deletion_result {
821 Ok(()) => {
822 if let Err(e) = tx.commit().await {
823 error!("Failed to commit account deletion transaction: {:?}", e);
824 return ApiError::InternalError(None).into_response();
825 }
826 let account_seq = crate::api::repo::record::sequence_account_event(
827 &state,
828 did,
829 false,
830 Some("deleted"),
831 )
832 .await;
833 match account_seq {
834 Ok(seq) => {
835 if let Err(e) = sqlx::query!(
836 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
837 did,
838 seq
839 )
840 .execute(&state.db)
841 .await
842 {
843 warn!(
844 "Failed to cleanup sequences for deleted account {}: {}",
845 did, e
846 );
847 }
848 }
849 Err(e) => {
850 warn!(
851 "Failed to sequence account deletion event for {}: {}",
852 did, e
853 );
854 }
855 }
856 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
857 info!("Account {} deleted successfully", did);
858 EmptyResponse::ok().into_response()
859 }
860 Err(e) => {
861 error!("DB error deleting account, rolling back: {:?}", e);
862 ApiError::InternalError(None).into_response()
863 }
864 }
865}