this repo has no description
1use crate::api::ApiError;
2use crate::plc::PlcClient;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::{Duration, Utc};
12use cid::Cid;
13use jacquard_repo::commit::Commit;
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::str::FromStr;
19use tracing::{error, info, warn};
20use uuid::Uuid;
21
22#[derive(Serialize)]
23#[serde(rename_all = "camelCase")]
24pub struct CheckAccountStatusOutput {
25 pub activated: bool,
26 pub valid_did: bool,
27 pub repo_commit: String,
28 pub repo_rev: String,
29 pub repo_blocks: i64,
30 pub indexed_records: i64,
31 pub private_state_values: i64,
32 pub expected_blobs: i64,
33 pub imported_blobs: i64,
34}
35
36pub async fn check_account_status(
37 State(state): State<AppState>,
38 headers: axum::http::HeaderMap,
39) -> Response {
40 let extracted = match crate::auth::extract_auth_token_from_header(
41 headers.get("Authorization").and_then(|h| h.to_str().ok()),
42 ) {
43 Some(t) => t,
44 None => return ApiError::AuthenticationRequired.into_response(),
45 };
46 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
47 let http_uri = format!(
48 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
49 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
50 );
51 let did = match crate::auth::validate_token_with_dpop(
52 &state.db,
53 &extracted.token,
54 extracted.is_dpop,
55 dpop_proof,
56 "GET",
57 &http_uri,
58 true,
59 )
60 .await
61 {
62 Ok(user) => user.did,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
66 .fetch_optional(&state.db)
67 .await
68 {
69 Ok(Some(id)) => id,
70 _ => {
71 return (
72 StatusCode::INTERNAL_SERVER_ERROR,
73 Json(json!({"error": "InternalError"})),
74 )
75 .into_response();
76 }
77 };
78 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
79 .fetch_optional(&state.db)
80 .await;
81 let deactivated_at = match user_status {
82 Ok(Some(row)) => row.deactivated_at,
83 _ => None,
84 };
85 let repo_result = sqlx::query!(
86 "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
87 user_id
88 )
89 .fetch_optional(&state.db)
90 .await;
91 let (repo_commit, repo_rev_from_db) = match repo_result {
92 Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
93 _ => (String::new(), None),
94 };
95 let block_count: i64 =
96 sqlx::query_scalar!("SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", user_id)
97 .fetch_one(&state.db)
98 .await
99 .unwrap_or(Some(0))
100 .unwrap_or(0);
101 let repo_rev = if let Some(rev) = repo_rev_from_db {
102 rev
103 } else if !repo_commit.is_empty() {
104 if let Ok(cid) = Cid::from_str(&repo_commit) {
105 if let Ok(Some(block)) = state.block_store.get(&cid).await {
106 Commit::from_cbor(&block)
107 .ok()
108 .map(|c| c.rev().to_string())
109 .unwrap_or_default()
110 } else {
111 String::new()
112 }
113 } else {
114 String::new()
115 }
116 } else {
117 String::new()
118 };
119 let record_count: i64 =
120 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
121 .fetch_one(&state.db)
122 .await
123 .unwrap_or(Some(0))
124 .unwrap_or(0);
125 let blob_count: i64 = sqlx::query_scalar!(
126 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
127 user_id
128 )
129 .fetch_one(&state.db)
130 .await
131 .unwrap_or(Some(0))
132 .unwrap_or(0);
133 let valid_did = is_valid_did_for_service(&state.db, &did).await;
134 (
135 StatusCode::OK,
136 Json(CheckAccountStatusOutput {
137 activated: deactivated_at.is_none(),
138 valid_did,
139 repo_commit: repo_commit.clone(),
140 repo_rev,
141 repo_blocks: block_count as i64,
142 indexed_records: record_count,
143 private_state_values: 0,
144 expected_blobs: blob_count,
145 imported_blobs: blob_count,
146 }),
147 )
148 .into_response()
149}
150
151async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool {
152 assert_valid_did_document_for_service(db, did, false)
153 .await
154 .is_ok()
155}
156
157async fn assert_valid_did_document_for_service(
158 db: &sqlx::PgPool,
159 did: &str,
160 with_retry: bool,
161) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
162 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
163 let expected_endpoint = format!("https://{}", hostname);
164
165 if did.starts_with("did:plc:") {
166 let plc_client = PlcClient::new(None);
167
168 let max_attempts = if with_retry { 5 } else { 1 };
169 let mut last_error = None;
170 let mut doc_data = None;
171 for attempt in 0..max_attempts {
172 if attempt > 0 {
173 let delay_ms = 500 * (1 << (attempt - 1));
174 info!(
175 "Waiting {}ms before retry {} for DID document validation ({})",
176 delay_ms, attempt, did
177 );
178 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
179 }
180
181 match plc_client.get_document_data(did).await {
182 Ok(data) => {
183 let pds_endpoint = data
184 .get("services")
185 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
186 .and_then(|p| p.get("endpoint"))
187 .and_then(|e| e.as_str());
188
189 if pds_endpoint == Some(&expected_endpoint) {
190 doc_data = Some(data);
191 break;
192 } else {
193 info!(
194 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying",
195 attempt + 1,
196 did,
197 pds_endpoint,
198 expected_endpoint
199 );
200 last_error = Some(format!(
201 "DID document endpoint {:?} does not match expected {}",
202 pds_endpoint, expected_endpoint
203 ));
204 }
205 }
206 Err(e) => {
207 warn!(
208 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
209 attempt + 1,
210 did,
211 e
212 );
213 last_error = Some(format!("Could not resolve DID document: {}", e));
214 }
215 }
216 }
217
218 let doc_data = match doc_data {
219 Some(d) => d,
220 None => {
221 return Err((
222 StatusCode::BAD_REQUEST,
223 Json(json!({
224 "error": "InvalidRequest",
225 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string())
226 })),
227 ));
228 }
229 };
230
231 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
232 if let Some(ref expected_rotation_key) = server_rotation_key {
233 let rotation_keys = doc_data
234 .get("rotationKeys")
235 .and_then(|v| v.as_array())
236 .map(|arr| {
237 arr.iter()
238 .filter_map(|k| k.as_str())
239 .collect::<Vec<_>>()
240 })
241 .unwrap_or_default();
242 if !rotation_keys.contains(&expected_rotation_key.as_str()) {
243 return Err((
244 StatusCode::BAD_REQUEST,
245 Json(json!({
246 "error": "InvalidRequest",
247 "message": "Server rotation key not included in PLC DID data"
248 })),
249 ));
250 }
251 }
252
253 let doc_signing_key = doc_data
254 .get("verificationMethods")
255 .and_then(|v| v.get("atproto"))
256 .and_then(|k| k.as_str());
257
258 let user_row = sqlx::query!(
259 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
260 did
261 )
262 .fetch_optional(db)
263 .await
264 .map_err(|e| {
265 error!("Failed to fetch user key: {:?}", e);
266 (
267 StatusCode::INTERNAL_SERVER_ERROR,
268 Json(json!({"error": "InternalError"})),
269 )
270 })?;
271
272 if let Some(row) = user_row {
273 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
274 .map_err(|e| {
275 error!("Failed to decrypt user key: {}", e);
276 (
277 StatusCode::INTERNAL_SERVER_ERROR,
278 Json(json!({"error": "InternalError"})),
279 )
280 })?;
281 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
282 error!("Failed to create signing key: {:?}", e);
283 (
284 StatusCode::INTERNAL_SERVER_ERROR,
285 Json(json!({"error": "InternalError"})),
286 )
287 })?;
288 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
289
290 if doc_signing_key != Some(&expected_did_key) {
291 warn!(
292 "DID {} has signing key {:?}, expected {}",
293 did, doc_signing_key, expected_did_key
294 );
295 return Err((
296 StatusCode::BAD_REQUEST,
297 Json(json!({
298 "error": "InvalidRequest",
299 "message": "DID document verification method does not match expected signing key"
300 })),
301 ));
302 }
303 }
304 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
305 let client = reqwest::Client::new();
306 let decoded = host_and_path.replace("%3A", ":");
307 let parts: Vec<&str> = decoded.split(':').collect();
308 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
309 {
310 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
311 } else {
312 (parts[0].to_string(), parts[1..].to_vec())
313 };
314 let scheme =
315 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
316 "http"
317 } else {
318 "https"
319 };
320 let url = if path_parts.is_empty() {
321 format!("{}://{}/.well-known/did.json", scheme, host)
322 } else {
323 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
324 };
325 let resp = client.get(&url).send().await.map_err(|e| {
326 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
327 (
328 StatusCode::BAD_REQUEST,
329 Json(json!({
330 "error": "InvalidRequest",
331 "message": format!("Could not resolve DID document: {}", e)
332 })),
333 )
334 })?;
335 let doc: serde_json::Value = resp.json().await.map_err(|e| {
336 warn!("Failed to parse did:web document for {}: {:?}", did, e);
337 (
338 StatusCode::BAD_REQUEST,
339 Json(json!({
340 "error": "InvalidRequest",
341 "message": format!("Could not parse DID document: {}", e)
342 })),
343 )
344 })?;
345
346 let pds_endpoint = doc
347 .get("service")
348 .and_then(|s| s.as_array())
349 .and_then(|arr| {
350 arr.iter().find(|svc| {
351 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
352 || svc.get("type").and_then(|t| t.as_str())
353 == Some("AtprotoPersonalDataServer")
354 })
355 })
356 .and_then(|svc| svc.get("serviceEndpoint"))
357 .and_then(|e| e.as_str());
358
359 if pds_endpoint != Some(&expected_endpoint) {
360 warn!(
361 "DID {} has endpoint {:?}, expected {}",
362 did, pds_endpoint, expected_endpoint
363 );
364 return Err((
365 StatusCode::BAD_REQUEST,
366 Json(json!({
367 "error": "InvalidRequest",
368 "message": "DID document atproto_pds service endpoint does not match PDS public url"
369 })),
370 ));
371 }
372 }
373
374 Ok(())
375}
376
377pub async fn activate_account(
378 State(state): State<AppState>,
379 headers: axum::http::HeaderMap,
380) -> Response {
381 info!("[MIGRATION] activateAccount called");
382 let extracted = match crate::auth::extract_auth_token_from_header(
383 headers.get("Authorization").and_then(|h| h.to_str().ok()),
384 ) {
385 Some(t) => t,
386 None => {
387 info!("[MIGRATION] activateAccount: No auth token");
388 return ApiError::AuthenticationRequired.into_response();
389 }
390 };
391 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
392 let http_uri = format!(
393 "https://{}/xrpc/com.atproto.server.activateAccount",
394 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
395 );
396 let auth_user = match crate::auth::validate_token_with_dpop(
397 &state.db,
398 &extracted.token,
399 extracted.is_dpop,
400 dpop_proof,
401 "POST",
402 &http_uri,
403 true,
404 )
405 .await
406 {
407 Ok(user) => user,
408 Err(e) => {
409 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
410 return ApiError::from(e).into_response();
411 }
412 };
413 info!(
414 "[MIGRATION] activateAccount: Authenticated user did={}",
415 auth_user.did
416 );
417
418 if let Err(e) = crate::auth::scope_check::check_account_scope(
419 auth_user.is_oauth,
420 auth_user.scope.as_deref(),
421 crate::oauth::scopes::AccountAttr::Repo,
422 crate::oauth::scopes::AccountAction::Manage,
423 ) {
424 info!("[MIGRATION] activateAccount: Scope check failed");
425 return e;
426 }
427
428 let did = auth_user.did;
429
430 info!(
431 "[MIGRATION] activateAccount: Validating DID document for did={}",
432 did
433 );
434 let did_validation_start = std::time::Instant::now();
435 if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await {
436 info!(
437 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
438 did,
439 did_validation_start.elapsed()
440 );
441 return (status, json).into_response();
442 }
443 info!(
444 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
445 did,
446 did_validation_start.elapsed()
447 );
448
449 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
450 .fetch_optional(&state.db)
451 .await
452 .ok()
453 .flatten();
454 info!(
455 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
456 did, handle
457 );
458 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
459 .execute(&state.db)
460 .await;
461 match result {
462 Ok(_) => {
463 info!(
464 "[MIGRATION] activateAccount: DB update success for did={}",
465 did
466 );
467 if let Some(ref h) = handle {
468 let _ = state.cache.delete(&format!("handle:{}", h)).await;
469 }
470 info!(
471 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
472 did
473 );
474 if let Err(e) =
475 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
476 {
477 warn!(
478 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
479 e
480 );
481 } else {
482 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
483 }
484 info!(
485 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
486 did, handle
487 );
488 if let Err(e) =
489 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref())
490 .await
491 {
492 warn!(
493 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
494 e
495 );
496 } else {
497 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
498 }
499 let repo_root = sqlx::query_scalar!(
500 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
501 did
502 )
503 .fetch_optional(&state.db)
504 .await
505 .ok()
506 .flatten();
507 if let Some(root_cid) = repo_root {
508 info!(
509 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
510 did, root_cid
511 );
512 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
513 if let Ok(Some(block)) = state.block_store.get(&cid).await {
514 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
515 } else {
516 None
517 }
518 } else {
519 None
520 };
521 if let Err(e) = crate::api::repo::record::sequence_sync_event(
522 &state,
523 &did,
524 &root_cid,
525 rev.as_deref(),
526 )
527 .await
528 {
529 warn!(
530 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
531 e
532 );
533 } else {
534 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
535 }
536 } else {
537 warn!(
538 "[MIGRATION] activateAccount: No repo root found for did={}",
539 did
540 );
541 }
542 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
543 (StatusCode::OK, Json(json!({}))).into_response()
544 }
545 Err(e) => {
546 error!(
547 "[MIGRATION] activateAccount: DB error activating account: {:?}",
548 e
549 );
550 (
551 StatusCode::INTERNAL_SERVER_ERROR,
552 Json(json!({"error": "InternalError"})),
553 )
554 .into_response()
555 }
556 }
557}
558
559#[derive(Deserialize)]
560#[serde(rename_all = "camelCase")]
561pub struct DeactivateAccountInput {
562 pub delete_after: Option<String>,
563}
564
565pub async fn deactivate_account(
566 State(state): State<AppState>,
567 headers: axum::http::HeaderMap,
568 Json(input): Json<DeactivateAccountInput>,
569) -> Response {
570 let extracted = match crate::auth::extract_auth_token_from_header(
571 headers.get("Authorization").and_then(|h| h.to_str().ok()),
572 ) {
573 Some(t) => t,
574 None => return ApiError::AuthenticationRequired.into_response(),
575 };
576 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
577 let http_uri = format!(
578 "https://{}/xrpc/com.atproto.server.deactivateAccount",
579 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
580 );
581 let auth_user = match crate::auth::validate_token_with_dpop(
582 &state.db,
583 &extracted.token,
584 extracted.is_dpop,
585 dpop_proof,
586 "POST",
587 &http_uri,
588 false,
589 )
590 .await
591 {
592 Ok(user) => user,
593 Err(e) => return ApiError::from(e).into_response(),
594 };
595
596 if let Err(e) = crate::auth::scope_check::check_account_scope(
597 auth_user.is_oauth,
598 auth_user.scope.as_deref(),
599 crate::oauth::scopes::AccountAttr::Repo,
600 crate::oauth::scopes::AccountAction::Manage,
601 ) {
602 return e;
603 }
604
605 let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
606 .delete_after
607 .as_ref()
608 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
609 .map(|dt| dt.with_timezone(&chrono::Utc));
610
611 let did = auth_user.did;
612 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
613 .fetch_optional(&state.db)
614 .await
615 .ok()
616 .flatten();
617 let result = sqlx::query!(
618 "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
619 did,
620 delete_after
621 )
622 .execute(&state.db)
623 .await;
624 match result {
625 Ok(_) => {
626 if let Some(ref h) = handle {
627 let _ = state.cache.delete(&format!("handle:{}", h)).await;
628 }
629 if let Err(e) = crate::api::repo::record::sequence_account_event(
630 &state,
631 &did,
632 false,
633 Some("deactivated"),
634 )
635 .await
636 {
637 warn!("Failed to sequence account deactivation event: {}", e);
638 }
639 (StatusCode::OK, Json(json!({}))).into_response()
640 }
641 Err(e) => {
642 error!("DB error deactivating account: {:?}", e);
643 (
644 StatusCode::INTERNAL_SERVER_ERROR,
645 Json(json!({"error": "InternalError"})),
646 )
647 .into_response()
648 }
649 }
650}
651
652pub async fn request_account_delete(
653 State(state): State<AppState>,
654 headers: axum::http::HeaderMap,
655) -> Response {
656 let extracted = match crate::auth::extract_auth_token_from_header(
657 headers.get("Authorization").and_then(|h| h.to_str().ok()),
658 ) {
659 Some(t) => t,
660 None => return ApiError::AuthenticationRequired.into_response(),
661 };
662 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
663 let http_uri = format!(
664 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
665 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
666 );
667 let validated = match crate::auth::validate_token_with_dpop(
668 &state.db,
669 &extracted.token,
670 extracted.is_dpop,
671 dpop_proof,
672 "POST",
673 &http_uri,
674 true,
675 )
676 .await
677 {
678 Ok(user) => user,
679 Err(e) => return ApiError::from(e).into_response(),
680 };
681 let did = validated.did.clone();
682
683 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await {
684 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await;
685 }
686
687 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
688 .fetch_optional(&state.db)
689 .await
690 {
691 Ok(Some(id)) => id,
692 _ => {
693 return (
694 StatusCode::INTERNAL_SERVER_ERROR,
695 Json(json!({"error": "InternalError"})),
696 )
697 .into_response();
698 }
699 };
700 let confirmation_token = Uuid::new_v4().to_string();
701 let expires_at = Utc::now() + Duration::minutes(15);
702 let insert = sqlx::query!(
703 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
704 confirmation_token,
705 did,
706 expires_at
707 )
708 .execute(&state.db)
709 .await;
710 if let Err(e) = insert {
711 error!("DB error creating deletion token: {:?}", e);
712 return (
713 StatusCode::INTERNAL_SERVER_ERROR,
714 Json(json!({"error": "InternalError"})),
715 )
716 .into_response();
717 }
718 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
719 if let Err(e) =
720 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
721 .await
722 {
723 warn!("Failed to enqueue account deletion notification: {:?}", e);
724 }
725 info!("Account deletion requested for user {}", did);
726 (StatusCode::OK, Json(json!({}))).into_response()
727}
728
729#[derive(Deserialize)]
730pub struct DeleteAccountInput {
731 pub did: String,
732 pub password: String,
733 pub token: String,
734}
735
736pub async fn delete_account(
737 State(state): State<AppState>,
738 Json(input): Json<DeleteAccountInput>,
739) -> Response {
740 let did = input.did.trim();
741 let password = &input.password;
742 let token = input.token.trim();
743 if did.is_empty() {
744 return (
745 StatusCode::BAD_REQUEST,
746 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
747 )
748 .into_response();
749 }
750 if password.is_empty() {
751 return (
752 StatusCode::BAD_REQUEST,
753 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
754 )
755 .into_response();
756 }
757 const OLD_PASSWORD_MAX_LENGTH: usize = 512;
758 if password.len() > OLD_PASSWORD_MAX_LENGTH {
759 return (
760 StatusCode::BAD_REQUEST,
761 Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})),
762 )
763 .into_response();
764 }
765 if token.is_empty() {
766 return (
767 StatusCode::BAD_REQUEST,
768 Json(json!({"error": "InvalidToken", "message": "token is required"})),
769 )
770 .into_response();
771 }
772 let user = sqlx::query!(
773 "SELECT id, password_hash, handle FROM users WHERE did = $1",
774 did
775 )
776 .fetch_optional(&state.db)
777 .await;
778 let (user_id, password_hash, handle) = match user {
779 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
780 Ok(None) => {
781 return (
782 StatusCode::BAD_REQUEST,
783 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
784 )
785 .into_response();
786 }
787 Err(e) => {
788 error!("DB error in delete_account: {:?}", e);
789 return (
790 StatusCode::INTERNAL_SERVER_ERROR,
791 Json(json!({"error": "InternalError"})),
792 )
793 .into_response();
794 }
795 };
796 let password_valid = if password_hash
797 .as_ref()
798 .map(|h| verify(password, h).unwrap_or(false))
799 .unwrap_or(false)
800 {
801 true
802 } else {
803 let app_pass_rows = sqlx::query!(
804 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
805 user_id
806 )
807 .fetch_all(&state.db)
808 .await
809 .unwrap_or_default();
810 app_pass_rows
811 .iter()
812 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
813 };
814 if !password_valid {
815 return (
816 StatusCode::UNAUTHORIZED,
817 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
818 )
819 .into_response();
820 }
821 let deletion_request = sqlx::query!(
822 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
823 token
824 )
825 .fetch_optional(&state.db)
826 .await;
827 let (token_did, expires_at) = match deletion_request {
828 Ok(Some(row)) => (row.did, row.expires_at),
829 Ok(None) => {
830 return (
831 StatusCode::BAD_REQUEST,
832 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
833 )
834 .into_response();
835 }
836 Err(e) => {
837 error!("DB error fetching deletion token: {:?}", e);
838 return (
839 StatusCode::INTERNAL_SERVER_ERROR,
840 Json(json!({"error": "InternalError"})),
841 )
842 .into_response();
843 }
844 };
845 if token_did != did {
846 return (
847 StatusCode::BAD_REQUEST,
848 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
849 )
850 .into_response();
851 }
852 if Utc::now() > expires_at {
853 let _ = sqlx::query!(
854 "DELETE FROM account_deletion_requests WHERE token = $1",
855 token
856 )
857 .execute(&state.db)
858 .await;
859 return (
860 StatusCode::BAD_REQUEST,
861 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
862 )
863 .into_response();
864 }
865 let mut tx = match state.db.begin().await {
866 Ok(tx) => tx,
867 Err(e) => {
868 error!("Failed to begin transaction: {:?}", e);
869 return (
870 StatusCode::INTERNAL_SERVER_ERROR,
871 Json(json!({"error": "InternalError"})),
872 )
873 .into_response();
874 }
875 };
876 let deletion_result: Result<(), sqlx::Error> = async {
877 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
878 .execute(&mut *tx)
879 .await?;
880 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
881 .execute(&mut *tx)
882 .await?;
883 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
884 .execute(&mut *tx)
885 .await?;
886 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
887 .execute(&mut *tx)
888 .await?;
889 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
890 .execute(&mut *tx)
891 .await?;
892 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
893 .execute(&mut *tx)
894 .await?;
895 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
896 .execute(&mut *tx)
897 .await?;
898 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
899 .execute(&mut *tx)
900 .await?;
901 Ok(())
902 }
903 .await;
904 match deletion_result {
905 Ok(()) => {
906 if let Err(e) = tx.commit().await {
907 error!("Failed to commit account deletion transaction: {:?}", e);
908 return (
909 StatusCode::INTERNAL_SERVER_ERROR,
910 Json(json!({"error": "InternalError"})),
911 )
912 .into_response();
913 }
914 let account_seq = crate::api::repo::record::sequence_account_event(
915 &state,
916 did,
917 false,
918 Some("deleted"),
919 )
920 .await;
921 match account_seq {
922 Ok(seq) => {
923 if let Err(e) = sqlx::query!(
924 "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
925 did,
926 seq
927 )
928 .execute(&state.db)
929 .await
930 {
931 warn!(
932 "Failed to cleanup sequences for deleted account {}: {}",
933 did, e
934 );
935 }
936 }
937 Err(e) => {
938 warn!(
939 "Failed to sequence account deletion event for {}: {}",
940 did, e
941 );
942 }
943 }
944 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
945 info!("Account {} deleted successfully", did);
946 (StatusCode::OK, Json(json!({}))).into_response()
947 }
948 Err(e) => {
949 error!("DB error deleting account, rolling back: {:?}", e);
950 (
951 StatusCode::INTERNAL_SERVER_ERROR,
952 Json(json!({"error": "InternalError"})),
953 )
954 .into_response()
955 }
956 }
957}