this repo has no description
1use crate::api::ApiError;
2use crate::plc::PlcClient;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use bcrypt::verify;
11use chrono::{Duration, Utc};
12use cid::Cid;
13use jacquard_repo::commit::Commit;
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::str::FromStr;
19use tracing::{error, info, warn};
20use uuid::Uuid;
21
22#[derive(Serialize)]
23#[serde(rename_all = "camelCase")]
24pub struct CheckAccountStatusOutput {
25 pub activated: bool,
26 pub valid_did: bool,
27 pub repo_commit: String,
28 pub repo_rev: String,
29 pub repo_blocks: i64,
30 pub indexed_records: i64,
31 pub private_state_values: i64,
32 pub expected_blobs: i64,
33 pub imported_blobs: i64,
34}
35
36pub async fn check_account_status(
37 State(state): State<AppState>,
38 headers: axum::http::HeaderMap,
39) -> Response {
40 let extracted = match crate::auth::extract_auth_token_from_header(
41 headers.get("Authorization").and_then(|h| h.to_str().ok()),
42 ) {
43 Some(t) => t,
44 None => return ApiError::AuthenticationRequired.into_response(),
45 };
46 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
47 let http_uri = format!(
48 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
49 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
50 );
51 let did = match crate::auth::validate_token_with_dpop(
52 &state.db,
53 &extracted.token,
54 extracted.is_dpop,
55 dpop_proof,
56 "GET",
57 &http_uri,
58 true,
59 )
60 .await
61 {
62 Ok(user) => user.did,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
66 .fetch_optional(&state.db)
67 .await
68 {
69 Ok(Some(id)) => id,
70 _ => {
71 return (
72 StatusCode::INTERNAL_SERVER_ERROR,
73 Json(json!({"error": "InternalError"})),
74 )
75 .into_response();
76 }
77 };
78 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
79 .fetch_optional(&state.db)
80 .await;
81 let deactivated_at = match user_status {
82 Ok(Some(row)) => row.deactivated_at,
83 _ => None,
84 };
85 let repo_result = sqlx::query!(
86 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
87 user_id
88 )
89 .fetch_optional(&state.db)
90 .await;
91 let repo_commit = match repo_result {
92 Ok(Some(row)) => row.repo_root_cid,
93 _ => String::new(),
94 };
95 let record_count: i64 =
96 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
97 .fetch_one(&state.db)
98 .await
99 .unwrap_or(Some(0))
100 .unwrap_or(0);
101 let blob_count: i64 = sqlx::query_scalar!(
102 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
103 user_id
104 )
105 .fetch_one(&state.db)
106 .await
107 .unwrap_or(Some(0))
108 .unwrap_or(0);
109 let valid_did = did.starts_with("did:");
110 (
111 StatusCode::OK,
112 Json(CheckAccountStatusOutput {
113 activated: deactivated_at.is_none(),
114 valid_did,
115 repo_commit: repo_commit.clone(),
116 repo_rev: chrono::Utc::now().timestamp_millis().to_string(),
117 repo_blocks: 0,
118 indexed_records: record_count,
119 private_state_values: 0,
120 expected_blobs: blob_count,
121 imported_blobs: blob_count,
122 }),
123 )
124 .into_response()
125}
126
127async fn assert_valid_did_document_for_service(
128 db: &sqlx::PgPool,
129 did: &str,
130) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
131 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
132 let expected_endpoint = format!("https://{}", hostname);
133
134 if did.starts_with("did:plc:") {
135 let plc_client = PlcClient::new(None);
136
137 let mut last_error = None;
138 let mut doc_data = None;
139 for attempt in 0..5 {
140 if attempt > 0 {
141 let delay_ms = 500 * (1 << (attempt - 1));
142 info!(
143 "Waiting {}ms before retry {} for DID document validation ({})",
144 delay_ms, attempt, did
145 );
146 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
147 }
148
149 match plc_client.get_document_data(did).await {
150 Ok(data) => {
151 let pds_endpoint = data
152 .get("services")
153 .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
154 .and_then(|p| p.get("endpoint"))
155 .and_then(|e| e.as_str());
156
157 if pds_endpoint == Some(&expected_endpoint) {
158 doc_data = Some(data);
159 break;
160 } else {
161 info!(
162 "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying",
163 attempt + 1,
164 did,
165 pds_endpoint,
166 expected_endpoint
167 );
168 last_error = Some(format!(
169 "DID document endpoint {:?} does not match expected {}",
170 pds_endpoint, expected_endpoint
171 ));
172 }
173 }
174 Err(e) => {
175 warn!(
176 "Attempt {}: Failed to fetch PLC document for {}: {:?}",
177 attempt + 1,
178 did,
179 e
180 );
181 last_error = Some(format!("Could not resolve DID document: {}", e));
182 }
183 }
184 }
185
186 let doc_data = match doc_data {
187 Some(d) => d,
188 None => {
189 return Err((
190 StatusCode::BAD_REQUEST,
191 Json(json!({
192 "error": "InvalidRequest",
193 "message": last_error.unwrap_or_else(|| "DID document validation failed".to_string())
194 })),
195 ));
196 }
197 };
198
199 let doc_signing_key = doc_data
200 .get("verificationMethods")
201 .and_then(|v| v.get("atproto"))
202 .and_then(|k| k.as_str());
203
204 let user_row = sqlx::query!(
205 "SELECT uk.key_bytes, uk.encryption_version FROM user_keys uk JOIN users u ON uk.user_id = u.id WHERE u.did = $1",
206 did
207 )
208 .fetch_optional(db)
209 .await
210 .map_err(|e| {
211 error!("Failed to fetch user key: {:?}", e);
212 (
213 StatusCode::INTERNAL_SERVER_ERROR,
214 Json(json!({"error": "InternalError"})),
215 )
216 })?;
217
218 if let Some(row) = user_row {
219 let key_bytes = crate::config::decrypt_key(&row.key_bytes, row.encryption_version)
220 .map_err(|e| {
221 error!("Failed to decrypt user key: {}", e);
222 (
223 StatusCode::INTERNAL_SERVER_ERROR,
224 Json(json!({"error": "InternalError"})),
225 )
226 })?;
227 let signing_key = SigningKey::from_slice(&key_bytes).map_err(|e| {
228 error!("Failed to create signing key: {:?}", e);
229 (
230 StatusCode::INTERNAL_SERVER_ERROR,
231 Json(json!({"error": "InternalError"})),
232 )
233 })?;
234 let expected_did_key = crate::plc::signing_key_to_did_key(&signing_key);
235
236 if doc_signing_key != Some(&expected_did_key) {
237 warn!(
238 "DID {} has signing key {:?}, expected {}",
239 did, doc_signing_key, expected_did_key
240 );
241 return Err((
242 StatusCode::BAD_REQUEST,
243 Json(json!({
244 "error": "InvalidRequest",
245 "message": "DID document verification method does not match expected signing key"
246 })),
247 ));
248 }
249 }
250 } else if let Some(host_and_path) = did.strip_prefix("did:web:") {
251 let client = reqwest::Client::new();
252 let decoded = host_and_path.replace("%3A", ":");
253 let parts: Vec<&str> = decoded.split(':').collect();
254 let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit())
255 {
256 (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec())
257 } else {
258 (parts[0].to_string(), parts[1..].to_vec())
259 };
260 let scheme =
261 if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') {
262 "http"
263 } else {
264 "https"
265 };
266 let url = if path_parts.is_empty() {
267 format!("{}://{}/.well-known/did.json", scheme, host)
268 } else {
269 format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/"))
270 };
271 let resp = client.get(&url).send().await.map_err(|e| {
272 warn!("Failed to fetch did:web document for {}: {:?}", did, e);
273 (
274 StatusCode::BAD_REQUEST,
275 Json(json!({
276 "error": "InvalidRequest",
277 "message": format!("Could not resolve DID document: {}", e)
278 })),
279 )
280 })?;
281 let doc: serde_json::Value = resp.json().await.map_err(|e| {
282 warn!("Failed to parse did:web document for {}: {:?}", did, e);
283 (
284 StatusCode::BAD_REQUEST,
285 Json(json!({
286 "error": "InvalidRequest",
287 "message": format!("Could not parse DID document: {}", e)
288 })),
289 )
290 })?;
291
292 let pds_endpoint = doc
293 .get("service")
294 .and_then(|s| s.as_array())
295 .and_then(|arr| {
296 arr.iter().find(|svc| {
297 svc.get("id").and_then(|id| id.as_str()) == Some("#atproto_pds")
298 || svc.get("type").and_then(|t| t.as_str())
299 == Some("AtprotoPersonalDataServer")
300 })
301 })
302 .and_then(|svc| svc.get("serviceEndpoint"))
303 .and_then(|e| e.as_str());
304
305 if pds_endpoint != Some(&expected_endpoint) {
306 warn!(
307 "DID {} has endpoint {:?}, expected {}",
308 did, pds_endpoint, expected_endpoint
309 );
310 return Err((
311 StatusCode::BAD_REQUEST,
312 Json(json!({
313 "error": "InvalidRequest",
314 "message": "DID document atproto_pds service endpoint does not match PDS public url"
315 })),
316 ));
317 }
318 }
319
320 Ok(())
321}
322
323pub async fn activate_account(
324 State(state): State<AppState>,
325 headers: axum::http::HeaderMap,
326) -> Response {
327 info!("[MIGRATION] activateAccount called");
328 let extracted = match crate::auth::extract_auth_token_from_header(
329 headers.get("Authorization").and_then(|h| h.to_str().ok()),
330 ) {
331 Some(t) => t,
332 None => {
333 info!("[MIGRATION] activateAccount: No auth token");
334 return ApiError::AuthenticationRequired.into_response();
335 }
336 };
337 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
338 let http_uri = format!(
339 "https://{}/xrpc/com.atproto.server.activateAccount",
340 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
341 );
342 let auth_user = match crate::auth::validate_token_with_dpop(
343 &state.db,
344 &extracted.token,
345 extracted.is_dpop,
346 dpop_proof,
347 "POST",
348 &http_uri,
349 true,
350 )
351 .await
352 {
353 Ok(user) => user,
354 Err(e) => {
355 info!("[MIGRATION] activateAccount: Auth failed: {:?}", e);
356 return ApiError::from(e).into_response();
357 }
358 };
359 info!(
360 "[MIGRATION] activateAccount: Authenticated user did={}",
361 auth_user.did
362 );
363
364 if let Err(e) = crate::auth::scope_check::check_account_scope(
365 auth_user.is_oauth,
366 auth_user.scope.as_deref(),
367 crate::oauth::scopes::AccountAttr::Repo,
368 crate::oauth::scopes::AccountAction::Manage,
369 ) {
370 info!("[MIGRATION] activateAccount: Scope check failed");
371 return e;
372 }
373
374 let did = auth_user.did;
375
376 info!(
377 "[MIGRATION] activateAccount: Validating DID document for did={}",
378 did
379 );
380 let did_validation_start = std::time::Instant::now();
381 if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did).await {
382 info!(
383 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
384 did,
385 did_validation_start.elapsed()
386 );
387 return (status, json).into_response();
388 }
389 info!(
390 "[MIGRATION] activateAccount: DID document validation SUCCESS for {} (took {:?})",
391 did,
392 did_validation_start.elapsed()
393 );
394
395 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
396 .fetch_optional(&state.db)
397 .await
398 .ok()
399 .flatten();
400 info!(
401 "[MIGRATION] activateAccount: Activating account did={} handle={:?}",
402 did, handle
403 );
404 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
405 .execute(&state.db)
406 .await;
407 match result {
408 Ok(_) => {
409 info!(
410 "[MIGRATION] activateAccount: DB update success for did={}",
411 did
412 );
413 if let Some(ref h) = handle {
414 let _ = state.cache.delete(&format!("handle:{}", h)).await;
415 }
416 info!(
417 "[MIGRATION] activateAccount: Sequencing account event (active=true) for did={}",
418 did
419 );
420 if let Err(e) =
421 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
422 {
423 warn!(
424 "[MIGRATION] activateAccount: Failed to sequence account activation event: {}",
425 e
426 );
427 } else {
428 info!("[MIGRATION] activateAccount: Account event sequenced successfully");
429 }
430 info!(
431 "[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
432 did, handle
433 );
434 if let Err(e) =
435 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref())
436 .await
437 {
438 warn!(
439 "[MIGRATION] activateAccount: Failed to sequence identity event for activation: {}",
440 e
441 );
442 } else {
443 info!("[MIGRATION] activateAccount: Identity event sequenced successfully");
444 }
445 let repo_root = sqlx::query_scalar!(
446 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
447 did
448 )
449 .fetch_optional(&state.db)
450 .await
451 .ok()
452 .flatten();
453 if let Some(root_cid) = repo_root {
454 info!(
455 "[MIGRATION] activateAccount: Sequencing sync event for did={} root_cid={}",
456 did, root_cid
457 );
458 let rev = if let Ok(cid) = Cid::from_str(&root_cid) {
459 if let Ok(Some(block)) = state.block_store.get(&cid).await {
460 Commit::from_cbor(&block).ok().map(|c| c.rev().to_string())
461 } else {
462 None
463 }
464 } else {
465 None
466 };
467 if let Err(e) = crate::api::repo::record::sequence_sync_event(
468 &state,
469 &did,
470 &root_cid,
471 rev.as_deref(),
472 )
473 .await
474 {
475 warn!(
476 "[MIGRATION] activateAccount: Failed to sequence sync event for activation: {}",
477 e
478 );
479 } else {
480 info!("[MIGRATION] activateAccount: Sync event sequenced successfully");
481 }
482 } else {
483 warn!(
484 "[MIGRATION] activateAccount: No repo root found for did={}",
485 did
486 );
487 }
488 info!("[MIGRATION] activateAccount: SUCCESS for did={}", did);
489 (StatusCode::OK, Json(json!({}))).into_response()
490 }
491 Err(e) => {
492 error!(
493 "[MIGRATION] activateAccount: DB error activating account: {:?}",
494 e
495 );
496 (
497 StatusCode::INTERNAL_SERVER_ERROR,
498 Json(json!({"error": "InternalError"})),
499 )
500 .into_response()
501 }
502 }
503}
504
505#[derive(Deserialize)]
506#[serde(rename_all = "camelCase")]
507pub struct DeactivateAccountInput {
508 pub delete_after: Option<String>,
509}
510
511pub async fn deactivate_account(
512 State(state): State<AppState>,
513 headers: axum::http::HeaderMap,
514 Json(_input): Json<DeactivateAccountInput>,
515) -> Response {
516 let extracted = match crate::auth::extract_auth_token_from_header(
517 headers.get("Authorization").and_then(|h| h.to_str().ok()),
518 ) {
519 Some(t) => t,
520 None => return ApiError::AuthenticationRequired.into_response(),
521 };
522 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
523 let http_uri = format!(
524 "https://{}/xrpc/com.atproto.server.deactivateAccount",
525 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
526 );
527 let auth_user = match crate::auth::validate_token_with_dpop(
528 &state.db,
529 &extracted.token,
530 extracted.is_dpop,
531 dpop_proof,
532 "POST",
533 &http_uri,
534 false,
535 )
536 .await
537 {
538 Ok(user) => user,
539 Err(e) => return ApiError::from(e).into_response(),
540 };
541
542 if let Err(e) = crate::auth::scope_check::check_account_scope(
543 auth_user.is_oauth,
544 auth_user.scope.as_deref(),
545 crate::oauth::scopes::AccountAttr::Repo,
546 crate::oauth::scopes::AccountAction::Manage,
547 ) {
548 return e;
549 }
550
551 let did = auth_user.did;
552 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
553 .fetch_optional(&state.db)
554 .await
555 .ok()
556 .flatten();
557 let result = sqlx::query!(
558 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
559 did
560 )
561 .execute(&state.db)
562 .await;
563 match result {
564 Ok(_) => {
565 if let Some(ref h) = handle {
566 let _ = state.cache.delete(&format!("handle:{}", h)).await;
567 }
568 if let Err(e) = crate::api::repo::record::sequence_account_event(
569 &state,
570 &did,
571 false,
572 Some("deactivated"),
573 )
574 .await
575 {
576 warn!("Failed to sequence account deactivation event: {}", e);
577 }
578 (StatusCode::OK, Json(json!({}))).into_response()
579 }
580 Err(e) => {
581 error!("DB error deactivating account: {:?}", e);
582 (
583 StatusCode::INTERNAL_SERVER_ERROR,
584 Json(json!({"error": "InternalError"})),
585 )
586 .into_response()
587 }
588 }
589}
590
591pub async fn request_account_delete(
592 State(state): State<AppState>,
593 headers: axum::http::HeaderMap,
594) -> Response {
595 let extracted = match crate::auth::extract_auth_token_from_header(
596 headers.get("Authorization").and_then(|h| h.to_str().ok()),
597 ) {
598 Some(t) => t,
599 None => return ApiError::AuthenticationRequired.into_response(),
600 };
601 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
602 let http_uri = format!(
603 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
604 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
605 );
606 let validated = match crate::auth::validate_token_with_dpop(
607 &state.db,
608 &extracted.token,
609 extracted.is_dpop,
610 dpop_proof,
611 "POST",
612 &http_uri,
613 true,
614 )
615 .await
616 {
617 Ok(user) => user,
618 Err(e) => return ApiError::from(e).into_response(),
619 };
620 let did = validated.did.clone();
621
622 if !crate::api::server::reauth::check_legacy_session_mfa(&state.db, &did).await {
623 return crate::api::server::reauth::legacy_mfa_required_response(&state.db, &did).await;
624 }
625
626 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
627 .fetch_optional(&state.db)
628 .await
629 {
630 Ok(Some(id)) => id,
631 _ => {
632 return (
633 StatusCode::INTERNAL_SERVER_ERROR,
634 Json(json!({"error": "InternalError"})),
635 )
636 .into_response();
637 }
638 };
639 let confirmation_token = Uuid::new_v4().to_string();
640 let expires_at = Utc::now() + Duration::minutes(15);
641 let insert = sqlx::query!(
642 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
643 confirmation_token,
644 did,
645 expires_at
646 )
647 .execute(&state.db)
648 .await;
649 if let Err(e) = insert {
650 error!("DB error creating deletion token: {:?}", e);
651 return (
652 StatusCode::INTERNAL_SERVER_ERROR,
653 Json(json!({"error": "InternalError"})),
654 )
655 .into_response();
656 }
657 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
658 if let Err(e) =
659 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
660 .await
661 {
662 warn!("Failed to enqueue account deletion notification: {:?}", e);
663 }
664 info!("Account deletion requested for user {}", did);
665 (StatusCode::OK, Json(json!({}))).into_response()
666}
667
668#[derive(Deserialize)]
669pub struct DeleteAccountInput {
670 pub did: String,
671 pub password: String,
672 pub token: String,
673}
674
675pub async fn delete_account(
676 State(state): State<AppState>,
677 Json(input): Json<DeleteAccountInput>,
678) -> Response {
679 let did = input.did.trim();
680 let password = &input.password;
681 let token = input.token.trim();
682 if did.is_empty() {
683 return (
684 StatusCode::BAD_REQUEST,
685 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
686 )
687 .into_response();
688 }
689 if password.is_empty() {
690 return (
691 StatusCode::BAD_REQUEST,
692 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
693 )
694 .into_response();
695 }
696 if token.is_empty() {
697 return (
698 StatusCode::BAD_REQUEST,
699 Json(json!({"error": "InvalidToken", "message": "token is required"})),
700 )
701 .into_response();
702 }
703 let user = sqlx::query!(
704 "SELECT id, password_hash, handle FROM users WHERE did = $1",
705 did
706 )
707 .fetch_optional(&state.db)
708 .await;
709 let (user_id, password_hash, handle) = match user {
710 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
711 Ok(None) => {
712 return (
713 StatusCode::BAD_REQUEST,
714 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
715 )
716 .into_response();
717 }
718 Err(e) => {
719 error!("DB error in delete_account: {:?}", e);
720 return (
721 StatusCode::INTERNAL_SERVER_ERROR,
722 Json(json!({"error": "InternalError"})),
723 )
724 .into_response();
725 }
726 };
727 let password_valid = if password_hash
728 .as_ref()
729 .map(|h| verify(password, h).unwrap_or(false))
730 .unwrap_or(false)
731 {
732 true
733 } else {
734 let app_pass_rows = sqlx::query!(
735 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
736 user_id
737 )
738 .fetch_all(&state.db)
739 .await
740 .unwrap_or_default();
741 app_pass_rows
742 .iter()
743 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
744 };
745 if !password_valid {
746 return (
747 StatusCode::UNAUTHORIZED,
748 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
749 )
750 .into_response();
751 }
752 let deletion_request = sqlx::query!(
753 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
754 token
755 )
756 .fetch_optional(&state.db)
757 .await;
758 let (token_did, expires_at) = match deletion_request {
759 Ok(Some(row)) => (row.did, row.expires_at),
760 Ok(None) => {
761 return (
762 StatusCode::BAD_REQUEST,
763 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
764 )
765 .into_response();
766 }
767 Err(e) => {
768 error!("DB error fetching deletion token: {:?}", e);
769 return (
770 StatusCode::INTERNAL_SERVER_ERROR,
771 Json(json!({"error": "InternalError"})),
772 )
773 .into_response();
774 }
775 };
776 if token_did != did {
777 return (
778 StatusCode::BAD_REQUEST,
779 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
780 )
781 .into_response();
782 }
783 if Utc::now() > expires_at {
784 let _ = sqlx::query!(
785 "DELETE FROM account_deletion_requests WHERE token = $1",
786 token
787 )
788 .execute(&state.db)
789 .await;
790 return (
791 StatusCode::BAD_REQUEST,
792 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
793 )
794 .into_response();
795 }
796 let mut tx = match state.db.begin().await {
797 Ok(tx) => tx,
798 Err(e) => {
799 error!("Failed to begin transaction: {:?}", e);
800 return (
801 StatusCode::INTERNAL_SERVER_ERROR,
802 Json(json!({"error": "InternalError"})),
803 )
804 .into_response();
805 }
806 };
807 let deletion_result: Result<(), sqlx::Error> = async {
808 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
809 .execute(&mut *tx)
810 .await?;
811 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
812 .execute(&mut *tx)
813 .await?;
814 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
815 .execute(&mut *tx)
816 .await?;
817 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
818 .execute(&mut *tx)
819 .await?;
820 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
821 .execute(&mut *tx)
822 .await?;
823 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
824 .execute(&mut *tx)
825 .await?;
826 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
827 .execute(&mut *tx)
828 .await?;
829 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
830 .execute(&mut *tx)
831 .await?;
832 Ok(())
833 }
834 .await;
835 match deletion_result {
836 Ok(()) => {
837 if let Err(e) = tx.commit().await {
838 error!("Failed to commit account deletion transaction: {:?}", e);
839 return (
840 StatusCode::INTERNAL_SERVER_ERROR,
841 Json(json!({"error": "InternalError"})),
842 )
843 .into_response();
844 }
845 if let Err(e) = crate::api::repo::record::sequence_account_event(
846 &state,
847 did,
848 false,
849 Some("deleted"),
850 )
851 .await
852 {
853 warn!(
854 "Failed to sequence account deletion event for {}: {}",
855 did, e
856 );
857 }
858 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
859 info!("Account {} deleted successfully", did);
860 (StatusCode::OK, Json(json!({}))).into_response()
861 }
862 Err(e) => {
863 error!("DB error deleting account, rolling back: {:?}", e);
864 (
865 StatusCode::INTERNAL_SERVER_ERROR,
866 Json(json!({"error": "InternalError"})),
867 )
868 .into_response()
869 }
870 }
871}