this repo has no description
1use crate::api::EmptyResponse;
2use crate::api::error::ApiError;
3use crate::cache::Cache;
4use crate::plc::PlcClient;
5use crate::state::AppState;
6use crate::types::{Handle, PlainPassword};
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use bcrypt::verify;
14use chrono::{Duration, Utc};
15use cid::Cid;
16use jacquard_repo::commit::Commit;
17use jacquard_repo::storage::BlockStore;
18use k256::ecdsa::SigningKey;
19use serde::{Deserialize, Serialize};
20use std::str::FromStr;
21use std::sync::Arc;
22use tracing::{error, info, warn};
23use uuid::Uuid;
24
25#[derive(Serialize)]
26#[serde(rename_all = "camelCase")]
27pub struct CheckAccountStatusOutput {
28 pub activated: bool,
29 pub valid_did: bool,
30 pub repo_commit: String,
31 pub repo_rev: String,
32 pub repo_blocks: i64,
33 pub indexed_records: i64,
34 pub private_state_values: i64,
35 pub expected_blobs: i64,
36 pub imported_blobs: i64,
37}
38
39pub async fn check_account_status(
40 State(state): State<AppState>,
41 headers: axum::http::HeaderMap,
42) -> Response {
43 let extracted = match crate::auth::extract_auth_token_from_header(
44 headers.get("Authorization").and_then(|h| h.to_str().ok()),
45 ) {
46 Some(t) => t,
47 None => return ApiError::AuthenticationRequired.into_response(),
48 };
49 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
50 let http_uri = format!(
51 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
52 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
53 );
54 let did = match crate::auth::validate_token_with_dpop(
55 &state.db,
56 &extracted.token,
57 extracted.is_dpop,
58 dpop_proof,
59 "GET",
60 &http_uri,
61 true,
62 false,
63 )
64 .await
65 {
66 Ok(user) => user.did,
67 Err(e) => return ApiError::from(e).into_response(),
68 };
69 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
70 .fetch_optional(&state.db)
71 .await
72 {
73 Ok(Some(id)) => id,
74 _ => {
75 return ApiError::InternalError(None).into_response();
76 }
77 };
78 let user_status = sqlx::query!(
79 "SELECT deactivated_at FROM users WHERE did = $1",
80 did.as_str()
81 )
82 .fetch_optional(&state.db)
83 .await;
84 let deactivated_at = match user_status {
85 Ok(Some(row)) => row.deactivated_at,
86 _ => None,
87 };
88 let repo_result = sqlx::query!(
89 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
90 user_id
91 )
92 .fetch_optional(&state.db)
93 .await;
94 let (repo_commit, repo_rev_from_db) = match repo_result {
95 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
96 _ => (String::new(), None),
97 };
98 let block_count: i64 = sqlx::query_scalar!(
99 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1",
100 user_id
101 )
102 .fetch_one(&state.db)
103 .await
104 .unwrap_or(Some(0))
105 .unwrap_or(0);
106 let repo_rev = if let Some(rev) = repo_rev_from_db {
107 rev
108 } else if !repo_commit.is_empty() {
109 if let Ok(cid) = Cid::from_str(&repo_commit) {
110 if let Ok(Some(block)) = state.block_store.get(&cid).await {
111 Commit::from_cbor(&block)
112 .ok()
113 .map(|c| c.rev().to_string())
114 .unwrap_or_default()
115 } else {
116 String::new()
117 }
118 } else {
119 String::new()
120 }
121 } else {
122 String::new()
123 };
124 let record_count: i64 =
125 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
126 .fetch_one(&state.db)
127 .await
128 .unwrap_or(Some(0))
129 .unwrap_or(0);
130 let imported_blobs: i64 = sqlx::query_scalar!(
131 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
132 user_id
133 )
134 .fetch_one(&state.db)
135 .await
136 .unwrap_or(Some(0))
137 .unwrap_or(0);
138 let expected_blobs: i64 = sqlx::query_scalar!(
139 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
140 user_id
141 )
142 .fetch_one(&state.db)
143 .await
144 .unwrap_or(Some(0))
145 .unwrap_or(0);
146 let valid_did = is_valid_did_for_service(&state.db, state.cache.clone(), did.as_str()).await;
147 (
148 StatusCode::OK,
149 Json(CheckAccountStatusOutput {
150 activated: deactivated_at.is_none(),
151 valid_did,
152 repo_commit: repo_commit.clone(),
153 repo_rev,
154 repo_blocks: block_count as i64,
155 indexed_records: record_count,
156 private_state_values: 0,
157 expected_blobs,
158 imported_blobs,
159 }),
160 )
161 .into_response()
162}
163
164async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: Arc<dyn Cache>, did: &str) -> bool {
165 assert_valid_did_document_for_service(db, cache, did, false)
166 .await
167 .is_ok()
168}
169
170async fn assert_valid_did_document_for_service(
171 db: &sqlx::PgPool,
172 cache: Arc<dyn Cache>,
173 did: &str,
174 with_retry: bool,
175) -> Result<(), ApiError> {
176 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
177 let expected_endpoint = format!("https://{}", hostname);
178
179 if did.starts_with("did:plc:") {
180 let plc_client = PlcClient::with_cache(None, Some(cache.clone()));
181
182 let max_attempts = if with_retry { 5 } else { 1 };
183 let mut last_error = None;
184 let mut doc_data = None;
185 for attempt in 0..max_attempts {
186 if attempt > 0 {
187 let delay_ms = 500 * (1 << (attempt - 1));
188 info!(
189 "Waiting {}ms before retry {} for DID document validation ({})",
190 delay_ms, attempt, did
191 );
192 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
193 }
194
195 match plc_client.get_document_data(did).await {
196 Ok(data) => {
197 let pds_endpoint = data
198 .get("services")
199 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
200 .and_then(|p| p.get("endpoint"))
201 .and_then(|e| e.as_str());
202
203 if pds_endpoint == Some(&expected_endpoint) {
204 doc_data = Some(data);
205 break;
206 } else {
207 info!(
208 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying",
209 attempt + 1,
210 did,
211 pds_endpoint,
212 expected_endpoint
213 );
214 last_error = Some(format!(
215 "DID document endpoint {:?} does not match expected {}",
216 pds_endpoint, expected_endpoint
217 ));
218 }
219 }
220 Err(e) => {
221 warn!(
222 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
223 attempt + 1,
224 did,
225 e
226 );
227 last_error = Some(format!("Could not resolve DID document: {}", e));
228 }
229 }
230 }
231
232 let Some(doc_data) = doc_data else {
233 return Err(ApiError::InvalidRequest(
234 last_error.unwrap_or_else(|| "DID document validation failed".to_string()),
235 ));
236 };
237
238 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
239 if let Some(ref expected_rotation_key) = server_rotation_key {
240 let rotation_keys = doc_data
241 .get("rotationKeys")
242 .and_then(|v| v.as_array())
243 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>())
244 .unwrap_or_default();
245 if !rotation_keys.contains(&expected_rotation_key.as_str()) {
246 return Err(ApiError::InvalidRequest(
247 "Server rotation key not included in PLC DID data".into(),
248 ));
249 }
250 }
251
252 let doc_signing_key = doc_data
253 .get("verificationMethods")
254 .and_then(|v| v.get("atproto"))
255 .and_then(|k| k.as_str());
256
257 let user_row = sqlx::query!(
258 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
259 did
260 )
261 .fetch_optional(db)
262 .await
263 .map_err(|e| {
264 error!("Failed to fetch user key: {:?}", e);
265 ApiError::InternalError(None)
266 })?;
267
268 if let Some(row) = user_row {
269 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
270 .map_err(|e| {
271 error!("Failed to decrypt user key: {}", e);
272 ApiError::InternalError(None)
273 })?;
274 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
275 error!("Failed to create signing key: {:?}", e);
276 ApiError::InternalError(None)
277 })?;
278 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
279
280 if doc_signing_key != Some(&expected_did_key) {
281 warn!(
282 "DID {} has signing key {:?}, expected {}",
283 did, doc_signing_key, expected_did_key
284 );
285 return Err(ApiError::InvalidRequest(
286 "DID document verification method does not match expected signing key".into(),
287 ));
288 }
289 }
290 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
291 let client = crate::api::proxy_client::did_resolution_client();
292 let decoded = host_and_path.replace("%3A", ":");
293 let parts: Vec<&str> = decoded.split(':').collect();
294 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
295 {
296 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
297 } else {
298 (parts[0].to_string(), parts[1..].to_vec())
299 };
300 let scheme =
301 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
302 "http"
303 } else {
304 "https"
305 };
306 let url = if path_parts.is_empty() {
307 format!("{}://{}/.well-known/did.json", scheme, host)
308 } else {
309 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
310 };
311 let resp = client.get(&url).send().await.map_err(|e| {
312 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
313 ApiError::InvalidRequest(format!("Could not resolve DID document: {}", e))
314 })?;
315 let doc: serde_json::Value = resp.json().await.map_err(|e| {
316 warn!("Failed to parse did:web document for {}: {:?}", did, e);
317 ApiError::InvalidRequest(format!("Could not parse DID document: {}", e))
318 })?;
319
320 let pds_endpoint = doc
321 .get("service")
322 .and_then(|s| s.as_array())
323 .and_then(|arr| {
324 arr.iter().find(|svc| {
325 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
326 || svc.get("type").and_then(|t| t.as_str())
327 == Some("AtprotoPersonalDataServer")
328 })
329 })
330 .and_then(|svc| svc.get("serviceEndpoint"))
331 .and_then(|e| e.as_str());
332
333 if pds_endpoint != Some(&expected_endpoint) {
334 warn!(
335 "DID {} has endpoint {:?}, expected {}",
336 did, pds_endpoint, expected_endpoint
337 );
338 return Err(ApiError::InvalidRequest(
339 "DID document atproto_pds service endpoint does not match PDS public url".into(),
340 ));
341 }
342 }
343
344 Ok(())
345}
346
347pub async fn activate_account(
348 State(state): State<AppState>,
349 headers: axum::http::HeaderMap,
350) -> Response {
351 info!("[MIGRATION] activateAccount called");
352 let extracted = match crate::auth::extract_auth_token_from_header(
353 headers.get("Authorization").and_then(|h| h.to_str().ok()),
354 ) {
355 Some(t) => t,
356 None => {
357 info!("[MIGRATION] activateAccount: No auth token");
358 return ApiError::AuthenticationRequired.into_response();
359 }
360 };
361 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
362 let http_uri = format!(
363 "https://{}/xrpc/com.atproto.server.activateAccount",
364 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
365 );
366 let auth_user = match crate::auth::validate_token_with_dpop(
367 &state.db,
368 &extracted.token,
369 extracted.is_dpop,
370 dpop_proof,
371 "POST",
372 &http_uri,
373 true,
374 false,
375 )
376 .await
377 {
378 Ok(user) => user,
379 Err(e) => {
380 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
381 return ApiError::from(e).into_response();
382 }
383 };
384 info!(
385 "[MIGRATION] activateAccount: Authenticated user did={}",
386 auth_user.did
387 );
388
389 if let Err(e) = crate::auth::scope_check::check_account_scope(
390 auth_user.is_oauth,
391 auth_user.scope.as_deref(),
392 crate::oauth::scopes::AccountAttr::Repo,
393 crate::oauth::scopes::AccountAction::Manage,
394 ) {
395 info!("[MIGRATION] activateAccount: Scope check failed");
396 return e;
397 }
398
399 let did = auth_user.did;
400
401 info!(
402 "[MIGRATION] activateAccount: Validating DID document for did={}",
403 did
404 );
405 let did_validation_start = std::time::Instant::now();
406 if let Err(e) =
407 assert_valid_did_document_for_service(&state.db, state.cache.clone(), did.as_str(), true)
408 .await
409 {
410 info!(
411 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
412 did,
413 did_validation_start.elapsed()
414 );
415 return e.into_response();
416 }
417 info!(
418 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
419 did,
420 did_validation_start.elapsed()
421 );
422
423 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
424 .fetch_optional(&state.db)
425 .await
426 .ok()
427 .flatten();
428 info!(
429 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
430 did, handle
431 );
432 let result = sqlx::query!(
433 "UPDATE users SET deactivated_at = NULL WHERE did = $1",
434 did.as_str()
435 )
436 .execute(&state.db)
437 .await;
438 match result {
439 Ok(_) => {
440 info!(
441 "[MIGRATION] activateAccount: DB update success for did={}",
442 did
443 );
444 if let Some(ref h) = handle {
445 let _ = state.cache.delete(&format!("handle:{}", h)).await;
446 }
447 info!(
448 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
449 did
450 );
451 if let Err(e) =
452 crate::api::repo::record::sequence_account_event(&state, &did, true, None)
453 .await
454 {
455 warn!(
456 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
457 e
458 );
459 } else {
460 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
461 }
462 info!(
463 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
464 did, handle
465 );
466 let handle_typed = handle.as_ref().map(|h| Handle::new_unchecked(h));
467 if let Err(e) = crate::api::repo::record::sequence_identity_event(
468 &state,
469 &did,
470 handle_typed.as_ref(),
471 )
472 .await
473 {
474 warn!(
475 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
476 e
477 );
478 } else {
479 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
480 }
481 let repo_root = sqlx::query_scalar!(
482 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
483 did.as_str()
484 )
485 .fetch_optional(&state.db)
486 .await
487 .ok()
488 .flatten();
489 if let Some(root_cid) = repo_root {
490 info!(
491 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
492 did, root_cid
493 );
494 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
495 if let Ok(Some(block)) = state.block_store.get(&cid).await {
496 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
497 } else {
498 None
499 }
500 } else {
501 None
502 };
503 if let Err(e) = crate::api::repo::record::sequence_sync_event(
504 &state,
505 &did,
506 &root_cid,
507 rev.as_deref(),
508 )
509 .await
510 {
511 warn!(
512 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
513 e
514 );
515 } else {
516 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
517 }
518 } else {
519 warn!(
520 "[MIGRATION] activateAccount: No repo root found for did={}",
521 did
522 );
523 }
524 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
525 EmptyResponse::ok().into_response()
526 }
527 Err(e) => {
528 error!(
529 "[MIGRATION] activateAccount: DB error activating account: {:?}",
530 e
531 );
532 ApiError::InternalError(None).into_response()
533 }
534 }
535}
536
537#[derive(Deserialize)]
538#[serde(rename_all = "camelCase")]
539pub struct DeactivateAccountInput {
540 pub delete_after: Option<String>,
541}
542
543pub async fn deactivate_account(
544 State(state): State<AppState>,
545 headers: axum::http::HeaderMap,
546 Json(input): Json<DeactivateAccountInput>,
547) -> Response {
548 let extracted = match crate::auth::extract_auth_token_from_header(
549 headers.get("Authorization").and_then(|h| h.to_str().ok()),
550 ) {
551 Some(t) => t,
552 None => return ApiError::AuthenticationRequired.into_response(),
553 };
554 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
555 let http_uri = format!(
556 "https://{}/xrpc/com.atproto.server.deactivateAccount",
557 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
558 );
559 let auth_user = match crate::auth::validate_token_with_dpop(
560 &state.db,
561 &extracted.token,
562 extracted.is_dpop,
563 dpop_proof,
564 "POST",
565 &http_uri,
566 false,
567 false,
568 )
569 .await
570 {
571 Ok(user) => user,
572 Err(e) => return ApiError::from(e).into_response(),
573 };
574
575 if let Err(e) = crate::auth::scope_check::check_account_scope(
576 auth_user.is_oauth,
577 auth_user.scope.as_deref(),
578 crate::oauth::scopes::AccountAttr::Repo,
579 crate::oauth::scopes::AccountAction::Manage,
580 ) {
581 return e;
582 }
583
584 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
585 .delete_after
586 .as_ref()
587 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
588 .map(|dt| dt.with_timezone(&chrono::Utc));
589
590 let did = auth_user.did;
591
592 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
593 .fetch_optional(&state.db)
594 .await
595 .ok()
596 .flatten();
597
598 let result = sqlx::query!(
599 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
600 did.as_str(),
601 delete_after
602 )
603 .execute(&state.db)
604 .await;
605
606 match result {
607 Ok(_) => {
608 if let Some(ref h) = handle {
609 let _ = state.cache.delete(&format!("handle:{}", h)).await;
610 }
611 if let Err(e) = crate::api::repo::record::sequence_account_event(
612 &state,
613 &did,
614 false,
615 Some("deactivated"),
616 )
617 .await
618 {
619 warn!("Failed to sequence account deactivated event: {}", e);
620 }
621 EmptyResponse::ok().into_response()
622 }
623 Err(e) => {
624 error!("DB error deactivating account: {:?}", e);
625 ApiError::InternalError(None).into_response()
626 }
627 }
628}
629
630pub async fn request_account_delete(
631 State(state): State<AppState>,
632 headers: axum::http::HeaderMap,
633) -> Response {
634 let extracted = match crate::auth::extract_auth_token_from_header(
635 headers.get("Authorization").and_then(|h| h.to_str().ok()),
636 ) {
637 Some(t) => t,
638 None => return ApiError::AuthenticationRequired.into_response(),
639 };
640 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
641 let http_uri = format!(
642 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
643 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
644 );
645 let validated = match crate::auth::validate_token_with_dpop(
646 &state.db,
647 &extracted.token,
648 extracted.is_dpop,
649 dpop_proof,
650 "POST",
651 &http_uri,
652 true,
653 false,
654 )
655 .await
656 {
657 Ok(user) => user,
658 Err(e) => return ApiError::from(e).into_response(),
659 };
660 let did = validated.did.clone();
661
662 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, did.as_str()).await {
663 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, did.as_str())
664 .await;
665 }
666
667 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
668 .fetch_optional(&state.db)
669 .await
670 {
671 Ok(Some(id)) => id,
672 _ => {
673 return ApiError::InternalError(None).into_response();
674 }
675 };
676 let confirmation_token = Uuid::new_v4().to_string();
677 let expires_at = Utc::now() + Duration::minutes(15);
678 let insert = sqlx::query!(
679 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
680 confirmation_token,
681 did.as_str(),
682 expires_at
683 )
684 .execute(&state.db)
685 .await;
686 if let Err(e) = insert {
687 error!("DB error creating deletion token: {:?}", e);
688 return ApiError::InternalError(None).into_response();
689 }
690 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
691 if let Err(e) =
692 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
693 .await
694 {
695 warn!("Failed to enqueue account deletion notification: {:?}", e);
696 }
697 info!("Account deletion requested for user {}", did);
698 EmptyResponse::ok().into_response()
699}
700
701#[derive(Deserialize)]
702pub struct DeleteAccountInput {
703 pub did: crate::types::Did,
704 pub password: PlainPassword,
705 pub token: String,
706}
707
708pub async fn delete_account(
709 State(state): State<AppState>,
710 Json(input): Json<DeleteAccountInput>,
711) -> Response {
712 let did = &input.did;
713 let password = &input.password;
714 let token = input.token.trim();
715 if password.is_empty() {
716 return ApiError::InvalidRequest("password is required".into()).into_response();
717 }
718 const OLD_PASSWORD_MAX_LENGTH: usize = 512;
719 if password.len() > OLD_PASSWORD_MAX_LENGTH {
720 return ApiError::InvalidRequest("Invalid password length".into()).into_response();
721 }
722 if token.is_empty() {
723 return ApiError::InvalidToken(Some("token is required".into())).into_response();
724 }
725 let user = sqlx::query!(
726 "SELECT id, password_hash, handle FROM users WHERE did = $1",
727 did.as_str()
728 )
729 .fetch_optional(&state.db)
730 .await;
731 let (user_id, password_hash, handle) = match user {
732 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
733 Ok(None) => {
734 return ApiError::InvalidRequest("account not found".into()).into_response();
735 }
736 Err(e) => {
737 error!("DB error in delete_account: {:?}", e);
738 return ApiError::InternalError(None).into_response();
739 }
740 };
741 let password_valid = if password_hash
742 .as_ref()
743 .map(|h| verify(password, h).unwrap_or(false))
744 .unwrap_or(false)
745 {
746 true
747 } else {
748 let app_pass_rows = sqlx::query!(
749 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
750 user_id
751 )
752 .fetch_all(&state.db)
753 .await
754 .unwrap_or_default();
755 app_pass_rows
756 .iter()
757 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
758 };
759 if !password_valid {
760 return ApiError::AuthenticationFailed(Some("Invalid password".into())).into_response();
761 }
762 let deletion_request = sqlx::query!(
763 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
764 token
765 )
766 .fetch_optional(&state.db)
767 .await;
768 let (token_did, expires_at) = match deletion_request {
769 Ok(Some(row)) => (row.did, row.expires_at),
770 Ok(None) => {
771 return ApiError::InvalidToken(Some("Invalid or expired token".into())).into_response();
772 }
773 Err(e) => {
774 error!("DB error fetching deletion token: {:?}", e);
775 return ApiError::InternalError(None).into_response();
776 }
777 };
778 if token_did != did.as_str() {
779 return ApiError::InvalidToken(Some("Token does not match account".into())).into_response();
780 }
781 if Utc::now() > expires_at {
782 let _ = sqlx::query!(
783 "DELETE FROM account_deletion_requests WHERE token = $1",
784 token
785 )
786 .execute(&state.db)
787 .await;
788 return ApiError::ExpiredToken(None).into_response();
789 }
790 let mut tx = match state.db.begin().await {
791 Ok(tx) => tx,
792 Err(e) => {
793 error!("Failed to begin transaction: {:?}", e);
794 return ApiError::InternalError(None).into_response();
795 }
796 };
797 let deletion_result: Result<(), sqlx::Error> = async {
798 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
799 .execute(&mut *tx)
800 .await?;
801 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
802 .execute(&mut *tx)
803 .await?;
804 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
805 .execute(&mut *tx)
806 .await?;
807 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
808 .execute(&mut *tx)
809 .await?;
810 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
811 .execute(&mut *tx)
812 .await?;
813 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
814 .execute(&mut *tx)
815 .await?;
816 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
817 .execute(&mut *tx)
818 .await?;
819 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
820 .execute(&mut *tx)
821 .await?;
822 Ok(())
823 }
824 .await;
825 match deletion_result {
826 Ok(()) => {
827 if let Err(e) = tx.commit().await {
828 error!("Failed to commit account deletion transaction: {:?}", e);
829 return ApiError::InternalError(None).into_response();
830 }
831 let account_seq = crate::api::repo::record::sequence_account_event(
832 &state,
833 did,
834 false,
835 Some("deleted"),
836 )
837 .await;
838 match account_seq {
839 Ok(seq) => {
840 if let Err(e) = sqlx::query!(
841 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
842 did,
843 seq
844 )
845 .execute(&state.db)
846 .await
847 {
848 warn!(
849 "Failed to cleanup sequences for deleted account {}: {}",
850 did, e
851 );
852 }
853 }
854 Err(e) => {
855 warn!(
856 "Failed to sequence account deletion event for {}: {}",
857 did, e
858 );
859 }
860 }
861 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
862 info!("Account {} deleted successfully", did);
863 EmptyResponse::ok().into_response()
864 }
865 Err(e) => {
866 error!("DB error deleting account, rolling back: {:?}", e);
867 ApiError::InternalError(None).into_response()
868 }
869 }
870}