this repo has no description
1use crate::api::error::ApiError;
2use crate::api::EmptyResponse;
3use crate::cache::Cache;
4use crate::plc::PlcClient;
5use crate::state::AppState;
6use crate::types::PlainPassword;
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use bcrypt::verify;
14use chrono::{Duration, Utc};
15use cid::Cid;
16use jacquard_repo::commit::Commit;
17use jacquard_repo::storage::BlockStore;
18use k256::ecdsa::SigningKey;
19use serde::{Deserialize, Serialize};
20use std::str::FromStr;
21use std::sync::Arc;
22use tracing::{error, info, warn};
23use uuid::Uuid;
24
25#[derive(Serialize)]
26#[serde(rename_all = "camelCase")]
27pub struct CheckAccountStatusOutput {
28 pub activated: bool,
29 pub valid_did: bool,
30 pub repo_commit: String,
31 pub repo_rev: String,
32 pub repo_blocks: i64,
33 pub indexed_records: i64,
34 pub private_state_values: i64,
35 pub expected_blobs: i64,
36 pub imported_blobs: i64,
37}
38
39pub async fn check_account_status(
40 State(state): State<AppState>,
41 headers: axum::http::HeaderMap,
42) -> Response {
43 let extracted = match crate::auth::extract_auth_token_from_header(
44 headers.get("Authorization").and_then(|h| h.to_str().ok()),
45 ) {
46 Some(t) => t,
47 None => return ApiError::AuthenticationRequired.into_response(),
48 };
49 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
50 let http_uri = format!(
51 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
52 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
53 );
54 let did = match crate::auth::validate_token_with_dpop(
55 &state.db,
56 &extracted.token,
57 extracted.is_dpop,
58 dpop_proof,
59 "GET",
60 &http_uri,
61 true,
62 )
63 .await
64 {
65 Ok(user) => user.did,
66 Err(e) => return ApiError::from(e).into_response(),
67 };
68 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
69 .fetch_optional(&state.db)
70 .await
71 {
72 Ok(Some(id)) => id,
73 _ => {
74 return ApiError::InternalError(None).into_response();
75 }
76 };
77 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did.as_str())
78 .fetch_optional(&state.db)
79 .await;
80 let deactivated_at = match user_status {
81 Ok(Some(row)) => row.deactivated_at,
82 _ => None,
83 };
84 let repo_result = sqlx::query!(
85 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
86 user_id
87 )
88 .fetch_optional(&state.db)
89 .await;
90 let (repo_commit, repo_rev_from_db) = match repo_result {
91 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
92 _ => (String::new(), None),
93 };
94 let block_count: i64 = sqlx::query_scalar!(
95 "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1",
96 user_id
97 )
98 .fetch_one(&state.db)
99 .await
100 .unwrap_or(Some(0))
101 .unwrap_or(0);
102 let repo_rev = if let Some(rev) = repo_rev_from_db {
103 rev
104 } else if !repo_commit.is_empty() {
105 if let Ok(cid) = Cid::from_str(&repo_commit) {
106 if let Ok(Some(block)) = state.block_store.get(&cid).await {
107 Commit::from_cbor(&block)
108 .ok()
109 .map(|c| c.rev().to_string())
110 .unwrap_or_default()
111 } else {
112 String::new()
113 }
114 } else {
115 String::new()
116 }
117 } else {
118 String::new()
119 };
120 let record_count: i64 =
121 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
122 .fetch_one(&state.db)
123 .await
124 .unwrap_or(Some(0))
125 .unwrap_or(0);
126 let imported_blobs: i64 = sqlx::query_scalar!(
127 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
128 user_id
129 )
130 .fetch_one(&state.db)
131 .await
132 .unwrap_or(Some(0))
133 .unwrap_or(0);
134 let expected_blobs: i64 = sqlx::query_scalar!(
135 "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
136 user_id
137 )
138 .fetch_one(&state.db)
139 .await
140 .unwrap_or(Some(0))
141 .unwrap_or(0);
142 let valid_did = is_valid_did_for_service(&state.db, state.cache.clone(), did.as_str()).await;
143 (
144 StatusCode::OK,
145 Json(CheckAccountStatusOutput {
146 activated: deactivated_at.is_none(),
147 valid_did,
148 repo_commit: repo_commit.clone(),
149 repo_rev,
150 repo_blocks: block_count as i64,
151 indexed_records: record_count,
152 private_state_values: 0,
153 expected_blobs,
154 imported_blobs,
155 }),
156 )
157 .into_response()
158}
159
160async fn is_valid_did_for_service(db: &sqlx::PgPool, cache: Arc<dyn Cache>, did: &str) -> bool {
161 assert_valid_did_document_for_service(db, cache, did, false)
162 .await
163 .is_ok()
164}
165
166async fn assert_valid_did_document_for_service(
167 db: &sqlx::PgPool,
168 cache: Arc<dyn Cache>,
169 did: &str,
170 with_retry: bool,
171) -> Result<(), ApiError> {
172 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
173 let expected_endpoint = format!("https://{}", hostname);
174
175 if did.starts_with("did:plc:") {
176 let plc_client = PlcClient::with_cache(None, Some(cache.clone()));
177
178 let max_attempts = if with_retry { 5 } else { 1 };
179 let mut last_error = None;
180 let mut doc_data = None;
181 for attempt in 0..max_attempts {
182 if attempt > 0 {
183 let delay_ms = 500 * (1 << (attempt - 1));
184 info!(
185 "Waiting {}ms before retry {} for DID document validation ({})",
186 delay_ms, attempt, did
187 );
188 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
189 }
190
191 match plc_client.get_document_data(did).await {
192 Ok(data) => {
193 let pds_endpoint = data
194 .get("services")
195 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
196 .and_then(|p| p.get("endpoint"))
197 .and_then(|e| e.as_str());
198
199 if pds_endpoint == Some(&expected_endpoint) {
200 doc_data = Some(data);
201 break;
202 } else {
203 info!(
204 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying",
205 attempt + 1,
206 did,
207 pds_endpoint,
208 expected_endpoint
209 );
210 last_error = Some(format!(
211 "DID document endpoint {:?} does not match expected {}",
212 pds_endpoint, expected_endpoint
213 ));
214 }
215 }
216 Err(e) => {
217 warn!(
218 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
219 attempt + 1,
220 did,
221 e
222 );
223 last_error = Some(format!("Could not resolve DID document: {}", e));
224 }
225 }
226 }
227
228 let Some(doc_data) = doc_data else {
229 return Err(ApiError::InvalidRequest(
230 last_error.unwrap_or_else(|| "DID document validation failed".to_string()),
231 ));
232 };
233
234 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
235 if let Some(ref expected_rotation_key) = server_rotation_key {
236 let rotation_keys = doc_data
237 .get("rotationKeys")
238 .and_then(|v| v.as_array())
239 .map(|arr| arr.iter().filter_map(|k| k.as_str()).collect::<Vec<_>>())
240 .unwrap_or_default();
241 if !rotation_keys.contains(&expected_rotation_key.as_str()) {
242 return Err(ApiError::InvalidRequest(
243 "Server rotation key not included in PLC DID data".into(),
244 ));
245 }
246 }
247
248 let doc_signing_key = doc_data
249 .get("verificationMethods")
250 .and_then(|v| v.get("atproto"))
251 .and_then(|k| k.as_str());
252
253 let user_row = sqlx::query!(
254 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
255 did
256 )
257 .fetch_optional(db)
258 .await
259 .map_err(|e| {
260 error!("Failed to fetch user key: {:?}", e);
261 ApiError::InternalError(None)
262 })?;
263
264 if let Some(row) = user_row {
265 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
266 .map_err(|e| {
267 error!("Failed to decrypt user key: {}", e);
268 ApiError::InternalError(None)
269 })?;
270 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
271 error!("Failed to create signing key: {:?}", e);
272 ApiError::InternalError(None)
273 })?;
274 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
275
276 if doc_signing_key != Some(&expected_did_key) {
277 warn!(
278 "DID {} has signing key {:?}, expected {}",
279 did, doc_signing_key, expected_did_key
280 );
281 return Err(ApiError::InvalidRequest(
282 "DID document verification method does not match expected signing key".into(),
283 ));
284 }
285 }
286 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
287 let client = crate::api::proxy_client::did_resolution_client();
288 let decoded = host_and_path.replace("%3A", ":");
289 let parts: Vec<&str> = decoded.split(':').collect();
290 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
291 {
292 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
293 } else {
294 (parts[0].to_string(), parts[1..].to_vec())
295 };
296 let scheme =
297 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
298 "http"
299 } else {
300 "https"
301 };
302 let url = if path_parts.is_empty() {
303 format!("{}://{}/.well-known/did.json", scheme, host)
304 } else {
305 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
306 };
307 let resp = client.get(&url).send().await.map_err(|e| {
308 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
309 ApiError::InvalidRequest(format!("Could not resolve DID document: {}", e))
310 })?;
311 let doc: serde_json::Value = resp.json().await.map_err(|e| {
312 warn!("Failed to parse did:web document for {}: {:?}", did, e);
313 ApiError::InvalidRequest(format!("Could not parse DID document: {}", e))
314 })?;
315
316 let pds_endpoint = doc
317 .get("service")
318 .and_then(|s| s.as_array())
319 .and_then(|arr| {
320 arr.iter().find(|svc| {
321 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
322 || svc.get("type").and_then(|t| t.as_str())
323 == Some("AtprotoPersonalDataServer")
324 })
325 })
326 .and_then(|svc| svc.get("serviceEndpoint"))
327 .and_then(|e| e.as_str());
328
329 if pds_endpoint != Some(&expected_endpoint) {
330 warn!(
331 "DID {} has endpoint {:?}, expected {}",
332 did, pds_endpoint, expected_endpoint
333 );
334 return Err(ApiError::InvalidRequest(
335 "DID document atproto_pds service endpoint does not match PDS public url".into(),
336 ));
337 }
338 }
339
340 Ok(())
341}
342
343pub async fn activate_account(
344 State(state): State<AppState>,
345 headers: axum::http::HeaderMap,
346) -> Response {
347 info!("[MIGRATION] activateAccount called");
348 let extracted = match crate::auth::extract_auth_token_from_header(
349 headers.get("Authorization").and_then(|h| h.to_str().ok()),
350 ) {
351 Some(t) => t,
352 None => {
353 info!("[MIGRATION] activateAccount: No auth token");
354 return ApiError::AuthenticationRequired.into_response();
355 }
356 };
357 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
358 let http_uri = format!(
359 "https://{}/xrpc/com.atproto.server.activateAccount",
360 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
361 );
362 let auth_user = match crate::auth::validate_token_with_dpop(
363 &state.db,
364 &extracted.token,
365 extracted.is_dpop,
366 dpop_proof,
367 "POST",
368 &http_uri,
369 true,
370 )
371 .await
372 {
373 Ok(user) => user,
374 Err(e) => {
375 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
376 return ApiError::from(e).into_response();
377 }
378 };
379 info!(
380 "[MIGRATION] activateAccount: Authenticated user did={}",
381 auth_user.did
382 );
383
384 if let Err(e) = crate::auth::scope_check::check_account_scope(
385 auth_user.is_oauth,
386 auth_user.scope.as_deref(),
387 crate::oauth::scopes::AccountAttr::Repo,
388 crate::oauth::scopes::AccountAction::Manage,
389 ) {
390 info!("[MIGRATION] activateAccount: Scope check failed");
391 return e;
392 }
393
394 let did = auth_user.did;
395
396 info!(
397 "[MIGRATION] activateAccount: Validating DID document for did={}",
398 did
399 );
400 let did_validation_start = std::time::Instant::now();
401 if let Err(e) =
402 assert_valid_did_document_for_service(&state.db, state.cache.clone(), did.as_str(), true).await
403 {
404 info!(
405 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
406 did,
407 did_validation_start.elapsed()
408 );
409 return e.into_response();
410 }
411 info!(
412 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
413 did,
414 did_validation_start.elapsed()
415 );
416
417 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
418 .fetch_optional(&state.db)
419 .await
420 .ok()
421 .flatten();
422 info!(
423 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
424 did, handle
425 );
426 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did.as_str())
427 .execute(&state.db)
428 .await;
429 match result {
430 Ok(_) => {
431 info!(
432 "[MIGRATION] activateAccount: DB update success for did={}",
433 did
434 );
435 if let Some(ref h) = handle {
436 let _ = state.cache.delete(&format!("handle:{}", h)).await;
437 }
438 info!(
439 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
440 did
441 );
442 if let Err(e) =
443 crate::api::repo::record::sequence_account_event(&state, did.as_str(), true, None).await
444 {
445 warn!(
446 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
447 e
448 );
449 } else {
450 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
451 }
452 info!(
453 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
454 did, handle
455 );
456 if let Err(e) =
457 crate::api::repo::record::sequence_identity_event(&state, did.as_str(), handle.as_deref())
458 .await
459 {
460 warn!(
461 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
462 e
463 );
464 } else {
465 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
466 }
467 let repo_root = sqlx::query_scalar!(
468 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
469 did.as_str()
470 )
471 .fetch_optional(&state.db)
472 .await
473 .ok()
474 .flatten();
475 if let Some(root_cid) = repo_root {
476 info!(
477 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
478 did, root_cid
479 );
480 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
481 if let Ok(Some(block)) = state.block_store.get(&cid).await {
482 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
483 } else {
484 None
485 }
486 } else {
487 None
488 };
489 if let Err(e) = crate::api::repo::record::sequence_sync_event(
490 &state,
491 did.as_str(),
492 &root_cid,
493 rev.as_deref(),
494 )
495 .await
496 {
497 warn!(
498 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
499 e
500 );
501 } else {
502 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
503 }
504 } else {
505 warn!(
506 "[MIGRATION] activateAccount: No repo root found for did={}",
507 did
508 );
509 }
510 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
511 EmptyResponse::ok().into_response()
512 }
513 Err(e) => {
514 error!(
515 "[MIGRATION] activateAccount: DB error activating account: {:?}",
516 e
517 );
518 ApiError::InternalError(None).into_response()
519 }
520 }
521}
522
523#[derive(Deserialize)]
524#[serde(rename_all = "camelCase")]
525pub struct DeactivateAccountInput {
526 pub delete_after: Option<String>,
527}
528
529pub async fn deactivate_account(
530 State(state): State<AppState>,
531 headers: axum::http::HeaderMap,
532 Json(input): Json<DeactivateAccountInput>,
533) -> Response {
534 let extracted = match crate::auth::extract_auth_token_from_header(
535 headers.get("Authorization").and_then(|h| h.to_str().ok()),
536 ) {
537 Some(t) => t,
538 None => return ApiError::AuthenticationRequired.into_response(),
539 };
540 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
541 let http_uri = format!(
542 "https://{}/xrpc/com.atproto.server.deactivateAccount",
543 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
544 );
545 let auth_user = match crate::auth::validate_token_with_dpop(
546 &state.db,
547 &extracted.token,
548 extracted.is_dpop,
549 dpop_proof,
550 "POST",
551 &http_uri,
552 false,
553 )
554 .await
555 {
556 Ok(user) => user,
557 Err(e) => return ApiError::from(e).into_response(),
558 };
559
560 if let Err(e) = crate::auth::scope_check::check_account_scope(
561 auth_user.is_oauth,
562 auth_user.scope.as_deref(),
563 crate::oauth::scopes::AccountAttr::Repo,
564 crate::oauth::scopes::AccountAction::Manage,
565 ) {
566 return e;
567 }
568
569 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
570 .delete_after
571 .as_ref()
572 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
573 .map(|dt| dt.with_timezone(&chrono::Utc));
574
575 let did = auth_user.did;
576
577 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
578 .fetch_optional(&state.db)
579 .await
580 .ok()
581 .flatten();
582
583 let result = sqlx::query!(
584 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
585 did.as_str(),
586 delete_after
587 )
588 .execute(&state.db)
589 .await;
590
591 match result {
592 Ok(_) => {
593 if let Some(ref h) = handle {
594 let _ = state.cache.delete(&format!("handle:{}", h)).await;
595 }
596 if let Err(e) = crate::api::repo::record::sequence_account_event(
597 &state,
598 did.as_str(),
599 false,
600 Some("deactivated"),
601 )
602 .await
603 {
604 warn!("Failed to sequence account deactivated event: {}", e);
605 }
606 EmptyResponse::ok().into_response()
607 }
608 Err(e) => {
609 error!("DB error deactivating account: {:?}", e);
610 ApiError::InternalError(None).into_response()
611 }
612 }
613}
614
615pub async fn request_account_delete(
616 State(state): State<AppState>,
617 headers: axum::http::HeaderMap,
618) -> Response {
619 let extracted = match crate::auth::extract_auth_token_from_header(
620 headers.get("Authorization").and_then(|h| h.to_str().ok()),
621 ) {
622 Some(t) => t,
623 None => return ApiError::AuthenticationRequired.into_response(),
624 };
625 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
626 let http_uri = format!(
627 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
628 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
629 );
630 let validated = match crate::auth::validate_token_with_dpop(
631 &state.db,
632 &extracted.token,
633 extracted.is_dpop,
634 dpop_proof,
635 "POST",
636 &http_uri,
637 true,
638 )
639 .await
640 {
641 Ok(user) => user,
642 Err(e) => return ApiError::from(e).into_response(),
643 };
644 let did = validated.did.clone();
645
646 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, did.as_str()).await {
647 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, did.as_str()).await;
648 }
649
650 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
651 .fetch_optional(&state.db)
652 .await
653 {
654 Ok(Some(id)) => id,
655 _ => {
656 return ApiError::InternalError(None).into_response();
657 }
658 };
659 let confirmation_token = Uuid::new_v4().to_string();
660 let expires_at = Utc::now() + Duration::minutes(15);
661 let insert = sqlx::query!(
662 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
663 confirmation_token,
664 did.as_str(),
665 expires_at
666 )
667 .execute(&state.db)
668 .await;
669 if let Err(e) = insert {
670 error!("DB error creating deletion token: {:?}", e);
671 return ApiError::InternalError(None).into_response();
672 }
673 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
674 if let Err(e) =
675 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
676 .await
677 {
678 warn!("Failed to enqueue account deletion notification: {:?}", e);
679 }
680 info!("Account deletion requested for user {}", did);
681 EmptyResponse::ok().into_response()
682}
683
684#[derive(Deserialize)]
685pub struct DeleteAccountInput {
686 pub did: crate::types::Did,
687 pub password: PlainPassword,
688 pub token: String,
689}
690
691pub async fn delete_account(
692 State(state): State<AppState>,
693 Json(input): Json<DeleteAccountInput>,
694) -> Response {
695 let did = &input.did;
696 let password = &input.password;
697 let token = input.token.trim();
698 if password.is_empty() {
699 return ApiError::InvalidRequest("password is required".into()).into_response();
700 }
701 const OLD_PASSWORD_MAX_LENGTH: usize = 512;
702 if password.len() > OLD_PASSWORD_MAX_LENGTH {
703 return ApiError::InvalidRequest("Invalid password length".into()).into_response();
704 }
705 if token.is_empty() {
706 return ApiError::InvalidToken(Some("token is required".into())).into_response();
707 }
708 let user = sqlx::query!(
709 "SELECT id, password_hash, handle FROM users WHERE did = $1",
710 did.as_str()
711 )
712 .fetch_optional(&state.db)
713 .await;
714 let (user_id, password_hash, handle) = match user {
715 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
716 Ok(None) => {
717 return ApiError::InvalidRequest("account not found".into()).into_response();
718 }
719 Err(e) => {
720 error!("DB error in delete_account: {:?}", e);
721 return ApiError::InternalError(None).into_response();
722 }
723 };
724 let password_valid = if password_hash
725 .as_ref()
726 .map(|h| verify(password, h).unwrap_or(false))
727 .unwrap_or(false)
728 {
729 true
730 } else {
731 let app_pass_rows = sqlx::query!(
732 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
733 user_id
734 )
735 .fetch_all(&state.db)
736 .await
737 .unwrap_or_default();
738 app_pass_rows
739 .iter()
740 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
741 };
742 if !password_valid {
743 return ApiError::AuthenticationFailed(Some("Invalid password".into())).into_response();
744 }
745 let deletion_request = sqlx::query!(
746 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
747 token
748 )
749 .fetch_optional(&state.db)
750 .await;
751 let (token_did, expires_at) = match deletion_request {
752 Ok(Some(row)) => (row.did, row.expires_at),
753 Ok(None) => {
754 return ApiError::InvalidToken(Some("Invalid or expired token".into())).into_response();
755 }
756 Err(e) => {
757 error!("DB error fetching deletion token: {:?}", e);
758 return ApiError::InternalError(None).into_response();
759 }
760 };
761 if token_did != did.as_str() {
762 return ApiError::InvalidToken(Some("Token does not match account".into())).into_response();
763 }
764 if Utc::now() > expires_at {
765 let _ = sqlx::query!(
766 "DELETE FROM account_deletion_requests WHERE token = $1",
767 token
768 )
769 .execute(&state.db)
770 .await;
771 return ApiError::ExpiredToken(None).into_response();
772 }
773 let mut tx = match state.db.begin().await {
774 Ok(tx) => tx,
775 Err(e) => {
776 error!("Failed to begin transaction: {:?}", e);
777 return ApiError::InternalError(None).into_response();
778 }
779 };
780 let deletion_result: Result<(), sqlx::Error> = async {
781 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
782 .execute(&mut *tx)
783 .await?;
784 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
785 .execute(&mut *tx)
786 .await?;
787 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
788 .execute(&mut *tx)
789 .await?;
790 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
791 .execute(&mut *tx)
792 .await?;
793 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
794 .execute(&mut *tx)
795 .await?;
796 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
797 .execute(&mut *tx)
798 .await?;
799 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
800 .execute(&mut *tx)
801 .await?;
802 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
803 .execute(&mut *tx)
804 .await?;
805 Ok(())
806 }
807 .await;
808 match deletion_result {
809 Ok(()) => {
810 if let Err(e) = tx.commit().await {
811 error!("Failed to commit account deletion transaction: {:?}", e);
812 return ApiError::InternalError(None).into_response();
813 }
814 let account_seq = crate::api::repo::record::sequence_account_event(
815 &state,
816 did,
817 false,
818 Some("deleted"),
819 )
820 .await;
821 match account_seq {
822 Ok(seq) => {
823 if let Err(e) = sqlx::query!(
824 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
825 did,
826 seq
827 )
828 .execute(&state.db)
829 .await
830 {
831 warn!(
832 "Failed to cleanup sequences for deleted account {}: {}",
833 did, e
834 );
835 }
836 }
837 Err(e) => {
838 warn!(
839 "Failed to sequence account deletion event for {}: {}",
840 did, e
841 );
842 }
843 }
844 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
845 info!("Account {} deleted successfully", did);
846 EmptyResponse::ok().into_response()
847 }
848 Err(e) => {
849 error!("DB error deleting account, rolling back: {:?}", e);
850 ApiError::InternalError(None).into_response()
851 }
852 }
853}