this repo has no description
1use crate::api::ApiError;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::State,
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use bcrypt::verify;
10use chrono::{Duration, Utc};
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14use uuid::Uuid;
15
16#[derive(Serialize)]
17#[serde(rename_all = "camelCase")]
18pub struct CheckAccountStatusOutput {
19 pub activated: bool,
20 pub valid_did: bool,
21 pub repo_commit: String,
22 pub repo_rev: String,
23 pub repo_blocks: i64,
24 pub indexed_records: i64,
25 pub private_state_values: i64,
26 pub expected_blobs: i64,
27 pub imported_blobs: i64,
28}
29
30pub async fn check_account_status(
31 State(state): State<AppState>,
32 headers: axum::http::HeaderMap,
33) -> Response {
34 let extracted = match crate::auth::extract_auth_token_from_header(
35 headers.get("Authorization").and_then(|h| h.to_str().ok())
36 ) {
37 Some(t) => t,
38 None => return ApiError::AuthenticationRequired.into_response(),
39 };
40 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
41 let http_uri = format!("https://{}/xrpc/com.atproto.server.checkAccountStatus",
42 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()));
43 let did = match crate::auth::validate_token_with_dpop(
44 &state.db,
45 &extracted.token,
46 extracted.is_dpop,
47 dpop_proof,
48 "GET",
49 &http_uri,
50 true,
51 ).await {
52 Ok(user) => user.did,
53 Err(e) => return ApiError::from(e).into_response(),
54 };
55 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
56 .fetch_optional(&state.db)
57 .await
58 {
59 Ok(Some(id)) => id,
60 _ => {
61 return (
62 StatusCode::INTERNAL_SERVER_ERROR,
63 Json(json!({"error": "InternalError"})),
64 )
65 .into_response();
66 }
67 };
68 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
69 .fetch_optional(&state.db)
70 .await;
71 let deactivated_at = match user_status {
72 Ok(Some(row)) => row.deactivated_at,
73 _ => None,
74 };
75 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
76 .fetch_optional(&state.db)
77 .await;
78 let repo_commit = match repo_result {
79 Ok(Some(row)) => row.repo_root_cid,
80 _ => String::new(),
81 };
82 let record_count: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
83 .fetch_one(&state.db)
84 .await
85 .unwrap_or(Some(0))
86 .unwrap_or(0);
87 let blob_count: i64 =
88 sqlx::query_scalar!("SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", user_id)
89 .fetch_one(&state.db)
90 .await
91 .unwrap_or(Some(0))
92 .unwrap_or(0);
93 let valid_did = did.starts_with("did:");
94 (
95 StatusCode::OK,
96 Json(CheckAccountStatusOutput {
97 activated: deactivated_at.is_none(),
98 valid_did,
99 repo_commit: repo_commit.clone(),
100 repo_rev: chrono::Utc::now().timestamp_millis().to_string(),
101 repo_blocks: 0,
102 indexed_records: record_count,
103 private_state_values: 0,
104 expected_blobs: blob_count,
105 imported_blobs: blob_count,
106 }),
107 )
108 .into_response()
109}
110
111pub async fn activate_account(
112 State(state): State<AppState>,
113 headers: axum::http::HeaderMap,
114) -> Response {
115 let extracted = match crate::auth::extract_auth_token_from_header(
116 headers.get("Authorization").and_then(|h| h.to_str().ok())
117 ) {
118 Some(t) => t,
119 None => return ApiError::AuthenticationRequired.into_response(),
120 };
121 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
122 let http_uri = format!("https://{}/xrpc/com.atproto.server.activateAccount",
123 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()));
124 let did = match crate::auth::validate_token_with_dpop(
125 &state.db,
126 &extracted.token,
127 extracted.is_dpop,
128 dpop_proof,
129 "POST",
130 &http_uri,
131 true,
132 ).await {
133 Ok(user) => user.did,
134 Err(e) => return ApiError::from(e).into_response(),
135 };
136 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
137 .fetch_optional(&state.db)
138 .await
139 .ok()
140 .flatten();
141 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
142 .execute(&state.db)
143 .await;
144 match result {
145 Ok(_) => {
146 if let Some(h) = handle {
147 let _ = state.cache.delete(&format!("handle:{}", h)).await;
148 }
149 (StatusCode::OK, Json(json!({}))).into_response()
150 }
151 Err(e) => {
152 error!("DB error activating account: {:?}", e);
153 (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError"})),
156 )
157 .into_response()
158 }
159 }
160}
161
162#[derive(Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct DeactivateAccountInput {
165 pub delete_after: Option<String>,
166}
167
168pub async fn deactivate_account(
169 State(state): State<AppState>,
170 headers: axum::http::HeaderMap,
171 Json(_input): Json<DeactivateAccountInput>,
172) -> Response {
173 let extracted = match crate::auth::extract_auth_token_from_header(
174 headers.get("Authorization").and_then(|h| h.to_str().ok())
175 ) {
176 Some(t) => t,
177 None => return ApiError::AuthenticationRequired.into_response(),
178 };
179 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
180 let http_uri = format!("https://{}/xrpc/com.atproto.server.deactivateAccount",
181 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()));
182 let did = match crate::auth::validate_token_with_dpop(
183 &state.db,
184 &extracted.token,
185 extracted.is_dpop,
186 dpop_proof,
187 "POST",
188 &http_uri,
189 false,
190 ).await {
191 Ok(user) => user.did,
192 Err(e) => return ApiError::from(e).into_response(),
193 };
194 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
195 .fetch_optional(&state.db)
196 .await
197 .ok()
198 .flatten();
199 let result = sqlx::query!("UPDATE users SET deactivated_at = NOW() WHERE did = $1", did)
200 .execute(&state.db)
201 .await;
202 match result {
203 Ok(_) => {
204 if let Some(h) = handle {
205 let _ = state.cache.delete(&format!("handle:{}", h)).await;
206 }
207 (StatusCode::OK, Json(json!({}))).into_response()
208 }
209 Err(e) => {
210 error!("DB error deactivating account: {:?}", e);
211 (
212 StatusCode::INTERNAL_SERVER_ERROR,
213 Json(json!({"error": "InternalError"})),
214 )
215 .into_response()
216 }
217 }
218}
219
220pub async fn request_account_delete(
221 State(state): State<AppState>,
222 headers: axum::http::HeaderMap,
223) -> Response {
224 let extracted = match crate::auth::extract_auth_token_from_header(
225 headers.get("Authorization").and_then(|h| h.to_str().ok())
226 ) {
227 Some(t) => t,
228 None => return ApiError::AuthenticationRequired.into_response(),
229 };
230 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
231 let http_uri = format!("https://{}/xrpc/com.atproto.server.requestAccountDelete",
232 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()));
233 let did = match crate::auth::validate_token_with_dpop(
234 &state.db,
235 &extracted.token,
236 extracted.is_dpop,
237 dpop_proof,
238 "POST",
239 &http_uri,
240 true,
241 ).await {
242 Ok(user) => user.did,
243 Err(e) => return ApiError::from(e).into_response(),
244 };
245 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
246 .fetch_optional(&state.db)
247 .await
248 {
249 Ok(Some(id)) => id,
250 _ => {
251 return (
252 StatusCode::INTERNAL_SERVER_ERROR,
253 Json(json!({"error": "InternalError"})),
254 )
255 .into_response();
256 }
257 };
258 let confirmation_token = Uuid::new_v4().to_string();
259 let expires_at = Utc::now() + Duration::minutes(15);
260 let insert = sqlx::query!(
261 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
262 confirmation_token,
263 did,
264 expires_at
265 )
266 .execute(&state.db)
267 .await;
268 if let Err(e) = insert {
269 error!("DB error creating deletion token: {:?}", e);
270 return (
271 StatusCode::INTERNAL_SERVER_ERROR,
272 Json(json!({"error": "InternalError"})),
273 )
274 .into_response();
275 }
276 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
277 if let Err(e) =
278 crate::notifications::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname).await
279 {
280 warn!("Failed to enqueue account deletion notification: {:?}", e);
281 }
282 info!("Account deletion requested for user {}", did);
283 (StatusCode::OK, Json(json!({}))).into_response()
284}
285
286#[derive(Deserialize)]
287pub struct DeleteAccountInput {
288 pub did: String,
289 pub password: String,
290 pub token: String,
291}
292
293pub async fn delete_account(
294 State(state): State<AppState>,
295 Json(input): Json<DeleteAccountInput>,
296) -> Response {
297 let did = input.did.trim();
298 let password = &input.password;
299 let token = input.token.trim();
300 if did.is_empty() {
301 return (
302 StatusCode::BAD_REQUEST,
303 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
304 )
305 .into_response();
306 }
307 if password.is_empty() {
308 return (
309 StatusCode::BAD_REQUEST,
310 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
311 )
312 .into_response();
313 }
314 if token.is_empty() {
315 return (
316 StatusCode::BAD_REQUEST,
317 Json(json!({"error": "InvalidToken", "message": "token is required"})),
318 )
319 .into_response();
320 }
321 let user = sqlx::query!(
322 "SELECT id, password_hash, handle FROM users WHERE did = $1",
323 did
324 )
325 .fetch_optional(&state.db)
326 .await;
327 let (user_id, password_hash, handle) = match user {
328 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
329 Ok(None) => {
330 return (
331 StatusCode::BAD_REQUEST,
332 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
333 )
334 .into_response();
335 }
336 Err(e) => {
337 error!("DB error in delete_account: {:?}", e);
338 return (
339 StatusCode::INTERNAL_SERVER_ERROR,
340 Json(json!({"error": "InternalError"})),
341 )
342 .into_response();
343 }
344 };
345 let password_valid = if verify(password, &password_hash).unwrap_or(false) {
346 true
347 } else {
348 let app_pass_rows = sqlx::query!(
349 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
350 user_id
351 )
352 .fetch_all(&state.db)
353 .await
354 .unwrap_or_default();
355 app_pass_rows
356 .iter()
357 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
358 };
359 if !password_valid {
360 return (
361 StatusCode::UNAUTHORIZED,
362 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
363 )
364 .into_response();
365 }
366 let deletion_request = sqlx::query!(
367 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
368 token
369 )
370 .fetch_optional(&state.db)
371 .await;
372 let (token_did, expires_at) = match deletion_request {
373 Ok(Some(row)) => (row.did, row.expires_at),
374 Ok(None) => {
375 return (
376 StatusCode::BAD_REQUEST,
377 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
378 )
379 .into_response();
380 }
381 Err(e) => {
382 error!("DB error fetching deletion token: {:?}", e);
383 return (
384 StatusCode::INTERNAL_SERVER_ERROR,
385 Json(json!({"error": "InternalError"})),
386 )
387 .into_response();
388 }
389 };
390 if token_did != did {
391 return (
392 StatusCode::BAD_REQUEST,
393 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
394 )
395 .into_response();
396 }
397 if Utc::now() > expires_at {
398 let _ = sqlx::query!("DELETE FROM account_deletion_requests WHERE token = $1", token)
399 .execute(&state.db)
400 .await;
401 return (
402 StatusCode::BAD_REQUEST,
403 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
404 )
405 .into_response();
406 }
407 let mut tx = match state.db.begin().await {
408 Ok(tx) => tx,
409 Err(e) => {
410 error!("Failed to begin transaction: {:?}", e);
411 return (
412 StatusCode::INTERNAL_SERVER_ERROR,
413 Json(json!({"error": "InternalError"})),
414 )
415 .into_response();
416 }
417 };
418 let deletion_result: Result<(), sqlx::Error> = async {
419 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
420 .execute(&mut *tx)
421 .await?;
422 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
423 .execute(&mut *tx)
424 .await?;
425 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
426 .execute(&mut *tx)
427 .await?;
428 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
429 .execute(&mut *tx)
430 .await?;
431 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
432 .execute(&mut *tx)
433 .await?;
434 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
435 .execute(&mut *tx)
436 .await?;
437 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
438 .execute(&mut *tx)
439 .await?;
440 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
441 .execute(&mut *tx)
442 .await?;
443 Ok(())
444 }
445 .await;
446 match deletion_result {
447 Ok(()) => {
448 if let Err(e) = tx.commit().await {
449 error!("Failed to commit account deletion transaction: {:?}", e);
450 return (
451 StatusCode::INTERNAL_SERVER_ERROR,
452 Json(json!({"error": "InternalError"})),
453 )
454 .into_response();
455 }
456 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
457 info!("Account {} deleted successfully", did);
458 (StatusCode::OK, Json(json!({}))).into_response()
459 }
460 Err(e) => {
461 error!("DB error deleting account, rolling back: {:?}", e);
462 (
463 StatusCode::INTERNAL_SERVER_ERROR,
464 Json(json!({"error": "InternalError"})),
465 )
466 .into_response()
467 }
468 }
469}