this repo has no description
1use crate::api::ApiError;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::State,
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use bcrypt::verify;
10use chrono::{Duration, Utc};
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14use uuid::Uuid;
15
16#[derive(Serialize)]
17#[serde(rename_all = "camelCase")]
18pub struct CheckAccountStatusOutput {
19 pub activated: bool,
20 pub valid_did: bool,
21 pub repo_commit: String,
22 pub repo_rev: String,
23 pub repo_blocks: i64,
24 pub indexed_records: i64,
25 pub private_state_values: i64,
26 pub expected_blobs: i64,
27 pub imported_blobs: i64,
28}
29
30pub async fn check_account_status(
31 State(state): State<AppState>,
32 headers: axum::http::HeaderMap,
33) -> Response {
34 let extracted = match crate::auth::extract_auth_token_from_header(
35 headers.get("Authorization").and_then(|h| h.to_str().ok()),
36 ) {
37 Some(t) => t,
38 None => return ApiError::AuthenticationRequired.into_response(),
39 };
40 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
41 let http_uri = format!(
42 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
43 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
44 );
45 let did = match crate::auth::validate_token_with_dpop(
46 &state.db,
47 &extracted.token,
48 extracted.is_dpop,
49 dpop_proof,
50 "GET",
51 &http_uri,
52 true,
53 )
54 .await
55 {
56 Ok(user) => user.did,
57 Err(e) => return ApiError::from(e).into_response(),
58 };
59 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
60 .fetch_optional(&state.db)
61 .await
62 {
63 Ok(Some(id)) => id,
64 _ => {
65 return (
66 StatusCode::INTERNAL_SERVER_ERROR,
67 Json(json!({"error": "InternalError"})),
68 )
69 .into_response();
70 }
71 };
72 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
73 .fetch_optional(&state.db)
74 .await;
75 let deactivated_at = match user_status {
76 Ok(Some(row)) => row.deactivated_at,
77 _ => None,
78 };
79 let repo_result = sqlx::query!(
80 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
81 user_id
82 )
83 .fetch_optional(&state.db)
84 .await;
85 let repo_commit = match repo_result {
86 Ok(Some(row)) => row.repo_root_cid,
87 _ => String::new(),
88 };
89 let record_count: i64 =
90 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
91 .fetch_one(&state.db)
92 .await
93 .unwrap_or(Some(0))
94 .unwrap_or(0);
95 let blob_count: i64 = sqlx::query_scalar!(
96 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
97 user_id
98 )
99 .fetch_one(&state.db)
100 .await
101 .unwrap_or(Some(0))
102 .unwrap_or(0);
103 let valid_did = did.starts_with("did:");
104 (
105 StatusCode::OK,
106 Json(CheckAccountStatusOutput {
107 activated: deactivated_at.is_none(),
108 valid_did,
109 repo_commit: repo_commit.clone(),
110 repo_rev: chrono::Utc::now().timestamp_millis().to_string(),
111 repo_blocks: 0,
112 indexed_records: record_count,
113 private_state_values: 0,
114 expected_blobs: blob_count,
115 imported_blobs: blob_count,
116 }),
117 )
118 .into_response()
119}
120
121pub async fn activate_account(
122 State(state): State<AppState>,
123 headers: axum::http::HeaderMap,
124) -> Response {
125 let extracted = match crate::auth::extract_auth_token_from_header(
126 headers.get("Authorization").and_then(|h| h.to_str().ok()),
127 ) {
128 Some(t) => t,
129 None => return ApiError::AuthenticationRequired.into_response(),
130 };
131 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
132 let http_uri = format!(
133 "https://{}/xrpc/com.atproto.server.activateAccount",
134 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
135 );
136 let did = match crate::auth::validate_token_with_dpop(
137 &state.db,
138 &extracted.token,
139 extracted.is_dpop,
140 dpop_proof,
141 "POST",
142 &http_uri,
143 true,
144 )
145 .await
146 {
147 Ok(user) => user.did,
148 Err(e) => return ApiError::from(e).into_response(),
149 };
150 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
151 .fetch_optional(&state.db)
152 .await
153 .ok()
154 .flatten();
155 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
156 .execute(&state.db)
157 .await;
158 match result {
159 Ok(_) => {
160 if let Some(h) = handle {
161 let _ = state.cache.delete(&format!("handle:{}", h)).await;
162 }
163 (StatusCode::OK, Json(json!({}))).into_response()
164 }
165 Err(e) => {
166 error!("DB error activating account: {:?}", e);
167 (
168 StatusCode::INTERNAL_SERVER_ERROR,
169 Json(json!({"error": "InternalError"})),
170 )
171 .into_response()
172 }
173 }
174}
175
176#[derive(Deserialize)]
177#[serde(rename_all = "camelCase")]
178pub struct DeactivateAccountInput {
179 pub delete_after: Option<String>,
180}
181
182pub async fn deactivate_account(
183 State(state): State<AppState>,
184 headers: axum::http::HeaderMap,
185 Json(_input): Json<DeactivateAccountInput>,
186) -> Response {
187 let extracted = match crate::auth::extract_auth_token_from_header(
188 headers.get("Authorization").and_then(|h| h.to_str().ok()),
189 ) {
190 Some(t) => t,
191 None => return ApiError::AuthenticationRequired.into_response(),
192 };
193 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
194 let http_uri = format!(
195 "https://{}/xrpc/com.atproto.server.deactivateAccount",
196 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
197 );
198 let did = match crate::auth::validate_token_with_dpop(
199 &state.db,
200 &extracted.token,
201 extracted.is_dpop,
202 dpop_proof,
203 "POST",
204 &http_uri,
205 false,
206 )
207 .await
208 {
209 Ok(user) => user.did,
210 Err(e) => return ApiError::from(e).into_response(),
211 };
212 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
213 .fetch_optional(&state.db)
214 .await
215 .ok()
216 .flatten();
217 let result = sqlx::query!(
218 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
219 did
220 )
221 .execute(&state.db)
222 .await;
223 match result {
224 Ok(_) => {
225 if let Some(h) = handle {
226 let _ = state.cache.delete(&format!("handle:{}", h)).await;
227 }
228 (StatusCode::OK, Json(json!({}))).into_response()
229 }
230 Err(e) => {
231 error!("DB error deactivating account: {:?}", e);
232 (
233 StatusCode::INTERNAL_SERVER_ERROR,
234 Json(json!({"error": "InternalError"})),
235 )
236 .into_response()
237 }
238 }
239}
240
241pub async fn request_account_delete(
242 State(state): State<AppState>,
243 headers: axum::http::HeaderMap,
244) -> Response {
245 let extracted = match crate::auth::extract_auth_token_from_header(
246 headers.get("Authorization").and_then(|h| h.to_str().ok()),
247 ) {
248 Some(t) => t,
249 None => return ApiError::AuthenticationRequired.into_response(),
250 };
251 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
252 let http_uri = format!(
253 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
254 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
255 );
256 let did = match crate::auth::validate_token_with_dpop(
257 &state.db,
258 &extracted.token,
259 extracted.is_dpop,
260 dpop_proof,
261 "POST",
262 &http_uri,
263 true,
264 )
265 .await
266 {
267 Ok(user) => user.did,
268 Err(e) => return ApiError::from(e).into_response(),
269 };
270 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
271 .fetch_optional(&state.db)
272 .await
273 {
274 Ok(Some(id)) => id,
275 _ => {
276 return (
277 StatusCode::INTERNAL_SERVER_ERROR,
278 Json(json!({"error": "InternalError"})),
279 )
280 .into_response();
281 }
282 };
283 let confirmation_token = Uuid::new_v4().to_string();
284 let expires_at = Utc::now() + Duration::minutes(15);
285 let insert = sqlx::query!(
286 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
287 confirmation_token,
288 did,
289 expires_at
290 )
291 .execute(&state.db)
292 .await;
293 if let Err(e) = insert {
294 error!("DB error creating deletion token: {:?}", e);
295 return (
296 StatusCode::INTERNAL_SERVER_ERROR,
297 Json(json!({"error": "InternalError"})),
298 )
299 .into_response();
300 }
301 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
302 if let Err(e) = crate::notifications::enqueue_account_deletion(
303 &state.db,
304 user_id,
305 &confirmation_token,
306 &hostname,
307 )
308 .await
309 {
310 warn!("Failed to enqueue account deletion notification: {:?}", e);
311 }
312 info!("Account deletion requested for user {}", did);
313 (StatusCode::OK, Json(json!({}))).into_response()
314}
315
316#[derive(Deserialize)]
317pub struct DeleteAccountInput {
318 pub did: String,
319 pub password: String,
320 pub token: String,
321}
322
323pub async fn delete_account(
324 State(state): State<AppState>,
325 Json(input): Json<DeleteAccountInput>,
326) -> Response {
327 let did = input.did.trim();
328 let password = &input.password;
329 let token = input.token.trim();
330 if did.is_empty() {
331 return (
332 StatusCode::BAD_REQUEST,
333 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
334 )
335 .into_response();
336 }
337 if password.is_empty() {
338 return (
339 StatusCode::BAD_REQUEST,
340 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
341 )
342 .into_response();
343 }
344 if token.is_empty() {
345 return (
346 StatusCode::BAD_REQUEST,
347 Json(json!({"error": "InvalidToken", "message": "token is required"})),
348 )
349 .into_response();
350 }
351 let user = sqlx::query!(
352 "SELECT id, password_hash, handle FROM users WHERE did = $1",
353 did
354 )
355 .fetch_optional(&state.db)
356 .await;
357 let (user_id, password_hash, handle) = match user {
358 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
359 Ok(None) => {
360 return (
361 StatusCode::BAD_REQUEST,
362 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
363 )
364 .into_response();
365 }
366 Err(e) => {
367 error!("DB error in delete_account: {:?}", e);
368 return (
369 StatusCode::INTERNAL_SERVER_ERROR,
370 Json(json!({"error": "InternalError"})),
371 )
372 .into_response();
373 }
374 };
375 let password_valid = if verify(password, &password_hash).unwrap_or(false) {
376 true
377 } else {
378 let app_pass_rows = sqlx::query!(
379 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
380 user_id
381 )
382 .fetch_all(&state.db)
383 .await
384 .unwrap_or_default();
385 app_pass_rows
386 .iter()
387 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
388 };
389 if !password_valid {
390 return (
391 StatusCode::UNAUTHORIZED,
392 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
393 )
394 .into_response();
395 }
396 let deletion_request = sqlx::query!(
397 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
398 token
399 )
400 .fetch_optional(&state.db)
401 .await;
402 let (token_did, expires_at) = match deletion_request {
403 Ok(Some(row)) => (row.did, row.expires_at),
404 Ok(None) => {
405 return (
406 StatusCode::BAD_REQUEST,
407 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
408 )
409 .into_response();
410 }
411 Err(e) => {
412 error!("DB error fetching deletion token: {:?}", e);
413 return (
414 StatusCode::INTERNAL_SERVER_ERROR,
415 Json(json!({"error": "InternalError"})),
416 )
417 .into_response();
418 }
419 };
420 if token_did != did {
421 return (
422 StatusCode::BAD_REQUEST,
423 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
424 )
425 .into_response();
426 }
427 if Utc::now() > expires_at {
428 let _ = sqlx::query!(
429 "DELETE FROM account_deletion_requests WHERE token = $1",
430 token
431 )
432 .execute(&state.db)
433 .await;
434 return (
435 StatusCode::BAD_REQUEST,
436 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
437 )
438 .into_response();
439 }
440 let mut tx = match state.db.begin().await {
441 Ok(tx) => tx,
442 Err(e) => {
443 error!("Failed to begin transaction: {:?}", e);
444 return (
445 StatusCode::INTERNAL_SERVER_ERROR,
446 Json(json!({"error": "InternalError"})),
447 )
448 .into_response();
449 }
450 };
451 let deletion_result: Result<(), sqlx::Error> = async {
452 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
453 .execute(&mut *tx)
454 .await?;
455 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
456 .execute(&mut *tx)
457 .await?;
458 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
459 .execute(&mut *tx)
460 .await?;
461 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
462 .execute(&mut *tx)
463 .await?;
464 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
465 .execute(&mut *tx)
466 .await?;
467 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
468 .execute(&mut *tx)
469 .await?;
470 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
471 .execute(&mut *tx)
472 .await?;
473 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
474 .execute(&mut *tx)
475 .await?;
476 Ok(())
477 }
478 .await;
479 match deletion_result {
480 Ok(()) => {
481 if let Err(e) = tx.commit().await {
482 error!("Failed to commit account deletion transaction: {:?}", e);
483 return (
484 StatusCode::INTERNAL_SERVER_ERROR,
485 Json(json!({"error": "InternalError"})),
486 )
487 .into_response();
488 }
489 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
490 info!("Account {} deleted successfully", did);
491 (StatusCode::OK, Json(json!({}))).into_response()
492 }
493 Err(e) => {
494 error!("DB error deleting account, rolling back: {:?}", e);
495 (
496 StatusCode::INTERNAL_SERVER_ERROR,
497 Json(json!({"error": "InternalError"})),
498 )
499 .into_response()
500 }
501 }
502}