this repo has no description
1use crate::api::ApiError;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::State,
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use bcrypt::verify;
10use chrono::{Duration, Utc};
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, info, warn};
14use uuid::Uuid;
15
16#[derive(Serialize)]
17#[serde(rename_all = "camelCase")]
18pub struct CheckAccountStatusOutput {
19 pub activated: bool,
20 pub valid_did: bool,
21 pub repo_commit: String,
22 pub repo_rev: String,
23 pub repo_blocks: i64,
24 pub indexed_records: i64,
25 pub private_state_values: i64,
26 pub expected_blobs: i64,
27 pub imported_blobs: i64,
28}
29
30pub async fn check_account_status(
31 State(state): State<AppState>,
32 headers: axum::http::HeaderMap,
33) -> Response {
34 let extracted = match crate::auth::extract_auth_token_from_header(
35 headers.get("Authorization").and_then(|h| h.to_str().ok()),
36 ) {
37 Some(t) => t,
38 None => return ApiError::AuthenticationRequired.into_response(),
39 };
40 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
41 let http_uri = format!(
42 "https://{}/xrpc/com.atproto.server.checkAccountStatus",
43 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
44 );
45 let did = match crate::auth::validate_token_with_dpop(
46 &state.db,
47 &extracted.token,
48 extracted.is_dpop,
49 dpop_proof,
50 "GET",
51 &http_uri,
52 true,
53 )
54 .await
55 {
56 Ok(user) => user.did,
57 Err(e) => return ApiError::from(e).into_response(),
58 };
59 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
60 .fetch_optional(&state.db)
61 .await
62 {
63 Ok(Some(id)) => id,
64 _ => {
65 return (
66 StatusCode::INTERNAL_SERVER_ERROR,
67 Json(json!({"error": "InternalError"})),
68 )
69 .into_response();
70 }
71 };
72 let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
73 .fetch_optional(&state.db)
74 .await;
75 let deactivated_at = match user_status {
76 Ok(Some(row)) => row.deactivated_at,
77 _ => None,
78 };
79 let repo_result = sqlx::query!(
80 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
81 user_id
82 )
83 .fetch_optional(&state.db)
84 .await;
85 let repo_commit = match repo_result {
86 Ok(Some(row)) => row.repo_root_cid,
87 _ => String::new(),
88 };
89 let record_count: i64 =
90 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
91 .fetch_one(&state.db)
92 .await
93 .unwrap_or(Some(0))
94 .unwrap_or(0);
95 let blob_count: i64 = sqlx::query_scalar!(
96 "SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
97 user_id
98 )
99 .fetch_one(&state.db)
100 .await
101 .unwrap_or(Some(0))
102 .unwrap_or(0);
103 let valid_did = did.starts_with("did:");
104 (
105 StatusCode::OK,
106 Json(CheckAccountStatusOutput {
107 activated: deactivated_at.is_none(),
108 valid_did,
109 repo_commit: repo_commit.clone(),
110 repo_rev: chrono::Utc::now().timestamp_millis().to_string(),
111 repo_blocks: 0,
112 indexed_records: record_count,
113 private_state_values: 0,
114 expected_blobs: blob_count,
115 imported_blobs: blob_count,
116 }),
117 )
118 .into_response()
119}
120
121pub async fn activate_account(
122 State(state): State<AppState>,
123 headers: axum::http::HeaderMap,
124) -> Response {
125 let extracted = match crate::auth::extract_auth_token_from_header(
126 headers.get("Authorization").and_then(|h| h.to_str().ok()),
127 ) {
128 Some(t) => t,
129 None => return ApiError::AuthenticationRequired.into_response(),
130 };
131 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
132 let http_uri = format!(
133 "https://{}/xrpc/com.atproto.server.activateAccount",
134 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
135 );
136 let auth_user = match crate::auth::validate_token_with_dpop(
137 &state.db,
138 &extracted.token,
139 extracted.is_dpop,
140 dpop_proof,
141 "POST",
142 &http_uri,
143 true,
144 )
145 .await
146 {
147 Ok(user) => user,
148 Err(e) => return ApiError::from(e).into_response(),
149 };
150
151 if let Err(e) = crate::auth::scope_check::check_account_scope(
152 auth_user.is_oauth,
153 auth_user.scope.as_deref(),
154 crate::oauth::scopes::AccountAttr::Repo,
155 crate::oauth::scopes::AccountAction::Manage,
156 ) {
157 return e;
158 }
159
160 let did = auth_user.did;
161 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
162 .fetch_optional(&state.db)
163 .await
164 .ok()
165 .flatten();
166 let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
167 .execute(&state.db)
168 .await;
169 match result {
170 Ok(_) => {
171 if let Some(ref h) = handle {
172 let _ = state.cache.delete(&format!("handle:{}", h)).await;
173 }
174 if let Err(e) =
175 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
176 {
177 warn!("Failed to sequence account activation event: {}", e);
178 }
179 if let Err(e) =
180 crate::api::repo::record::sequence_identity_event(&state, &did, handle.as_deref())
181 .await
182 {
183 warn!("Failed to sequence identity event for activation: {}", e);
184 }
185 if let Err(e) =
186 crate::api::repo::record::sequence_empty_commit_event(&state, &did).await
187 {
188 warn!(
189 "Failed to sequence empty commit event for activation: {}",
190 e
191 );
192 }
193 (StatusCode::OK, Json(json!({}))).into_response()
194 }
195 Err(e) => {
196 error!("DB error activating account: {:?}", e);
197 (
198 StatusCode::INTERNAL_SERVER_ERROR,
199 Json(json!({"error": "InternalError"})),
200 )
201 .into_response()
202 }
203 }
204}
205
206#[derive(Deserialize)]
207#[serde(rename_all = "camelCase")]
208pub struct DeactivateAccountInput {
209 pub delete_after: Option<String>,
210}
211
212pub async fn deactivate_account(
213 State(state): State<AppState>,
214 headers: axum::http::HeaderMap,
215 Json(_input): Json<DeactivateAccountInput>,
216) -> Response {
217 let extracted = match crate::auth::extract_auth_token_from_header(
218 headers.get("Authorization").and_then(|h| h.to_str().ok()),
219 ) {
220 Some(t) => t,
221 None => return ApiError::AuthenticationRequired.into_response(),
222 };
223 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
224 let http_uri = format!(
225 "https://{}/xrpc/com.atproto.server.deactivateAccount",
226 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
227 );
228 let auth_user = match crate::auth::validate_token_with_dpop(
229 &state.db,
230 &extracted.token,
231 extracted.is_dpop,
232 dpop_proof,
233 "POST",
234 &http_uri,
235 false,
236 )
237 .await
238 {
239 Ok(user) => user,
240 Err(e) => return ApiError::from(e).into_response(),
241 };
242
243 if let Err(e) = crate::auth::scope_check::check_account_scope(
244 auth_user.is_oauth,
245 auth_user.scope.as_deref(),
246 crate::oauth::scopes::AccountAttr::Repo,
247 crate::oauth::scopes::AccountAction::Manage,
248 ) {
249 return e;
250 }
251
252 let did = auth_user.did;
253 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
254 .fetch_optional(&state.db)
255 .await
256 .ok()
257 .flatten();
258 let result = sqlx::query!(
259 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
260 did
261 )
262 .execute(&state.db)
263 .await;
264 match result {
265 Ok(_) => {
266 if let Some(ref h) = handle {
267 let _ = state.cache.delete(&format!("handle:{}", h)).await;
268 }
269 if let Err(e) = crate::api::repo::record::sequence_account_event(
270 &state,
271 &did,
272 false,
273 Some("deactivated"),
274 )
275 .await
276 {
277 warn!("Failed to sequence account deactivation event: {}", e);
278 }
279 (StatusCode::OK, Json(json!({}))).into_response()
280 }
281 Err(e) => {
282 error!("DB error deactivating account: {:?}", e);
283 (
284 StatusCode::INTERNAL_SERVER_ERROR,
285 Json(json!({"error": "InternalError"})),
286 )
287 .into_response()
288 }
289 }
290}
291
292pub async fn request_account_delete(
293 State(state): State<AppState>,
294 headers: axum::http::HeaderMap,
295) -> Response {
296 let extracted = match crate::auth::extract_auth_token_from_header(
297 headers.get("Authorization").and_then(|h| h.to_str().ok()),
298 ) {
299 Some(t) => t,
300 None => return ApiError::AuthenticationRequired.into_response(),
301 };
302 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
303 let http_uri = format!(
304 "https://{}/xrpc/com.atproto.server.requestAccountDelete",
305 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
306 );
307 let did = match crate::auth::validate_token_with_dpop(
308 &state.db,
309 &extracted.token,
310 extracted.is_dpop,
311 dpop_proof,
312 "POST",
313 &http_uri,
314 true,
315 )
316 .await
317 {
318 Ok(user) => user.did,
319 Err(e) => return ApiError::from(e).into_response(),
320 };
321 let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
322 .fetch_optional(&state.db)
323 .await
324 {
325 Ok(Some(id)) => id,
326 _ => {
327 return (
328 StatusCode::INTERNAL_SERVER_ERROR,
329 Json(json!({"error": "InternalError"})),
330 )
331 .into_response();
332 }
333 };
334 let confirmation_token = Uuid::new_v4().to_string();
335 let expires_at = Utc::now() + Duration::minutes(15);
336 let insert = sqlx::query!(
337 "INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
338 confirmation_token,
339 did,
340 expires_at
341 )
342 .execute(&state.db)
343 .await;
344 if let Err(e) = insert {
345 error!("DB error creating deletion token: {:?}", e);
346 return (
347 StatusCode::INTERNAL_SERVER_ERROR,
348 Json(json!({"error": "InternalError"})),
349 )
350 .into_response();
351 }
352 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
353 if let Err(e) =
354 crate::comms::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname)
355 .await
356 {
357 warn!("Failed to enqueue account deletion notification: {:?}", e);
358 }
359 info!("Account deletion requested for user {}", did);
360 (StatusCode::OK, Json(json!({}))).into_response()
361}
362
363#[derive(Deserialize)]
364pub struct DeleteAccountInput {
365 pub did: String,
366 pub password: String,
367 pub token: String,
368}
369
370pub async fn delete_account(
371 State(state): State<AppState>,
372 Json(input): Json<DeleteAccountInput>,
373) -> Response {
374 let did = input.did.trim();
375 let password = &input.password;
376 let token = input.token.trim();
377 if did.is_empty() {
378 return (
379 StatusCode::BAD_REQUEST,
380 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
381 )
382 .into_response();
383 }
384 if password.is_empty() {
385 return (
386 StatusCode::BAD_REQUEST,
387 Json(json!({"error": "InvalidRequest", "message": "password is required"})),
388 )
389 .into_response();
390 }
391 if token.is_empty() {
392 return (
393 StatusCode::BAD_REQUEST,
394 Json(json!({"error": "InvalidToken", "message": "token is required"})),
395 )
396 .into_response();
397 }
398 let user = sqlx::query!(
399 "SELECT id, password_hash, handle FROM users WHERE did = $1",
400 did
401 )
402 .fetch_optional(&state.db)
403 .await;
404 let (user_id, password_hash, handle) = match user {
405 Ok(Some(row)) => (row.id, row.password_hash, row.handle),
406 Ok(None) => {
407 return (
408 StatusCode::BAD_REQUEST,
409 Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
410 )
411 .into_response();
412 }
413 Err(e) => {
414 error!("DB error in delete_account: {:?}", e);
415 return (
416 StatusCode::INTERNAL_SERVER_ERROR,
417 Json(json!({"error": "InternalError"})),
418 )
419 .into_response();
420 }
421 };
422 let password_valid = if password_hash
423 .as_ref()
424 .map(|h| verify(password, h).unwrap_or(false))
425 .unwrap_or(false)
426 {
427 true
428 } else {
429 let app_pass_rows = sqlx::query!(
430 "SELECT password_hash FROM app_passwords WHERE user_id = $1",
431 user_id
432 )
433 .fetch_all(&state.db)
434 .await
435 .unwrap_or_default();
436 app_pass_rows
437 .iter()
438 .any(|row| verify(password, &row.password_hash).unwrap_or(false))
439 };
440 if !password_valid {
441 return (
442 StatusCode::UNAUTHORIZED,
443 Json(json!({"error": "AuthenticationFailed", "message": "Invalid password"})),
444 )
445 .into_response();
446 }
447 let deletion_request = sqlx::query!(
448 "SELECT did, expires_at FROM account_deletion_requests WHERE token = $1",
449 token
450 )
451 .fetch_optional(&state.db)
452 .await;
453 let (token_did, expires_at) = match deletion_request {
454 Ok(Some(row)) => (row.did, row.expires_at),
455 Ok(None) => {
456 return (
457 StatusCode::BAD_REQUEST,
458 Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
459 )
460 .into_response();
461 }
462 Err(e) => {
463 error!("DB error fetching deletion token: {:?}", e);
464 return (
465 StatusCode::INTERNAL_SERVER_ERROR,
466 Json(json!({"error": "InternalError"})),
467 )
468 .into_response();
469 }
470 };
471 if token_did != did {
472 return (
473 StatusCode::BAD_REQUEST,
474 Json(json!({"error": "InvalidToken", "message": "Token does not match account"})),
475 )
476 .into_response();
477 }
478 if Utc::now() > expires_at {
479 let _ = sqlx::query!(
480 "DELETE FROM account_deletion_requests WHERE token = $1",
481 token
482 )
483 .execute(&state.db)
484 .await;
485 return (
486 StatusCode::BAD_REQUEST,
487 Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
488 )
489 .into_response();
490 }
491 let mut tx = match state.db.begin().await {
492 Ok(tx) => tx,
493 Err(e) => {
494 error!("Failed to begin transaction: {:?}", e);
495 return (
496 StatusCode::INTERNAL_SERVER_ERROR,
497 Json(json!({"error": "InternalError"})),
498 )
499 .into_response();
500 }
501 };
502 let deletion_result: Result<(), sqlx::Error> = async {
503 sqlx::query!("DELETE FROM session_tokens WHERE did = $1", did)
504 .execute(&mut *tx)
505 .await?;
506 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
507 .execute(&mut *tx)
508 .await?;
509 sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
510 .execute(&mut *tx)
511 .await?;
512 sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
513 .execute(&mut *tx)
514 .await?;
515 sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
516 .execute(&mut *tx)
517 .await?;
518 sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1", user_id)
519 .execute(&mut *tx)
520 .await?;
521 sqlx::query!("DELETE FROM account_deletion_requests WHERE did = $1", did)
522 .execute(&mut *tx)
523 .await?;
524 sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
525 .execute(&mut *tx)
526 .await?;
527 Ok(())
528 }
529 .await;
530 match deletion_result {
531 Ok(()) => {
532 if let Err(e) = tx.commit().await {
533 error!("Failed to commit account deletion transaction: {:?}", e);
534 return (
535 StatusCode::INTERNAL_SERVER_ERROR,
536 Json(json!({"error": "InternalError"})),
537 )
538 .into_response();
539 }
540 if let Err(e) = crate::api::repo::record::sequence_account_event(
541 &state,
542 did,
543 false,
544 Some("deleted"),
545 )
546 .await
547 {
548 warn!(
549 "Failed to sequence account deletion event for {}: {}",
550 did, e
551 );
552 }
553 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
554 info!("Account {} deleted successfully", did);
555 (StatusCode::OK, Json(json!({}))).into_response()
556 }
557 Err(e) => {
558 error!("DB error deleting account, rolling back: {:?}", e);
559 (
560 StatusCode::INTERNAL_SERVER_ERROR,
561 Json(json!({"error": "InternalError"})),
562 )
563 .into_response()
564 }
565 }
566}