this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub did_doc: Option<serde_json::Value>,
61 pub access_jwt: String,
62 pub refresh_jwt: String,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 let is_potential_migration = input
73 .did
74 .as_ref()
75 .map(|d| d.starts_with("did:plc:"))
76 .unwrap_or(false);
77 if is_potential_migration {
78 info!(
79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}",
80 input.did, input.handle
81 );
82 } else {
83 info!("create_account called");
84 }
85 let client_ip = extract_client_ip(&headers);
86 if !state
87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
88 .await
89 {
90 warn!(ip = %client_ip, "Account creation rate limit exceeded");
91 return (
92 StatusCode::TOO_MANY_REQUESTS,
93 Json(json!({
94 "error": "RateLimitExceeded",
95 "message": "Too many account creation attempts. Please try again later."
96 })),
97 )
98 .into_response();
99 }
100
101 let migration_auth = if let Some(token) =
102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
103 {
104 if is_service_token(&token) {
105 let verifier = ServiceTokenVerifier::new();
106 match verifier
107 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
108 .await
109 {
110 Ok(claims) => {
111 debug!("Service token verified for migration: iss={}", claims.iss);
112 Some(claims.iss)
113 }
114 Err(e) => {
115 error!("Service token verification failed: {:?}", e);
116 return (
117 StatusCode::UNAUTHORIZED,
118 Json(json!({
119 "error": "AuthenticationFailed",
120 "message": format!("Service token verification failed: {}", e)
121 })),
122 )
123 .into_response();
124 }
125 }
126 } else {
127 None
128 }
129 } else {
130 None
131 };
132
133 let is_did_web_byod = migration_auth.is_some()
134 && input
135 .did
136 .as_ref()
137 .map(|d| d.starts_with("did:web:"))
138 .unwrap_or(false);
139
140 let is_migration = migration_auth.is_some()
141 && input
142 .did
143 .as_ref()
144 .map(|d| d.starts_with("did:plc:"))
145 .unwrap_or(false);
146
147 if (is_migration || is_did_web_byod)
148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
149 {
150 if provided_did != auth_did {
151 info!(
152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}",
153 auth_did, provided_did
154 );
155 return (
156 StatusCode::FORBIDDEN,
157 Json(json!({
158 "error": "AuthorizationError",
159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did)
160 })),
161 )
162 .into_response();
163 }
164 if is_did_web_byod {
165 info!(did = %provided_did, "Processing did:web BYOD account creation");
166 } else {
167 info!(
168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}",
169 provided_did
170 );
171 }
172 }
173
174 let hostname_for_validation =
175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
176 let pds_suffix = format!(".{}", hostname_for_validation);
177
178 let validated_short_handle = if !input.handle.contains('.')
179 || input.handle.ends_with(&pds_suffix)
180 {
181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
182 input
183 .handle
184 .strip_suffix(&pds_suffix)
185 .unwrap_or(&input.handle)
186 } else {
187 &input.handle
188 };
189 match crate::api::validation::validate_short_handle(handle_to_validate) {
190 Ok(h) => h,
191 Err(crate::api::validation::HandleValidationError::Reserved) => {
192 return (
193 StatusCode::BAD_REQUEST,
194 Json(json!({"error": "HandleNotAvailable", "message": "Reserved handle"})),
195 )
196 .into_response();
197 }
198 Err(e) => {
199 return (
200 StatusCode::BAD_REQUEST,
201 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
202 )
203 .into_response();
204 }
205 }
206 } else {
207 if input.handle.contains(' ') || input.handle.contains('\t') {
208 return (
209 StatusCode::BAD_REQUEST,
210 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
211 )
212 .into_response();
213 }
214 for c in input.handle.chars() {
215 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
216 return (
217 StatusCode::BAD_REQUEST,
218 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
219 )
220 .into_response();
221 }
222 }
223 let handle_lower = input.handle.to_lowercase();
224 if crate::moderation::has_explicit_slur(&handle_lower) {
225 return (
226 StatusCode::BAD_REQUEST,
227 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})),
228 )
229 .into_response();
230 }
231 handle_lower
232 };
233 let email: Option<String> = input
234 .email
235 .as_ref()
236 .map(|e| e.trim().to_string())
237 .filter(|e| !e.is_empty());
238 if let Some(ref email) = email
239 && !crate::api::validation::is_valid_email(email)
240 {
241 return (
242 StatusCode::BAD_REQUEST,
243 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
244 )
245 .into_response();
246 }
247 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
248 let valid_channels = ["email", "discord", "telegram", "signal"];
249 if !valid_channels.contains(&verification_channel) && !is_migration {
250 return (
251 StatusCode::BAD_REQUEST,
252 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
253 )
254 .into_response();
255 }
256 let verification_recipient = if is_migration {
257 None
258 } else {
259 Some(match verification_channel {
260 "email" => match &input.email {
261 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
262 _ => return (
263 StatusCode::BAD_REQUEST,
264 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
265 ).into_response(),
266 },
267 "discord" => match &input.discord_id {
268 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
269 _ => return (
270 StatusCode::BAD_REQUEST,
271 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
272 ).into_response(),
273 },
274 "telegram" => match &input.telegram_username {
275 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
276 _ => return (
277 StatusCode::BAD_REQUEST,
278 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
279 ).into_response(),
280 },
281 "signal" => match &input.signal_number {
282 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
283 _ => return (
284 StatusCode::BAD_REQUEST,
285 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
286 ).into_response(),
287 },
288 _ => return (
289 StatusCode::BAD_REQUEST,
290 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
291 ).into_response(),
292 })
293 };
294 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
295 let pds_endpoint = format!("https://{}", hostname);
296 let suffix = format!(".{}", hostname);
297 let handle = if input.handle.ends_with(&suffix) {
298 format!("{}.{}", validated_short_handle, hostname)
299 } else if input.handle.contains('.') {
300 validated_short_handle.clone()
301 } else {
302 format!("{}.{}", validated_short_handle, hostname)
303 };
304 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
305 if let Some(signing_key_did) = &input.signing_key {
306 let reserved = sqlx::query!(
307 r#"
308 SELECT id, private_key_bytes
309 FROM reserved_signing_keys
310 WHERE public_key_did_key = $1
311 AND used_at IS NULL
312 AND expires_at > NOW()
313 FOR UPDATE
314 "#,
315 signing_key_did
316 )
317 .fetch_optional(&state.db)
318 .await;
319 match reserved {
320 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
321 Ok(None) => {
322 return (
323 StatusCode::BAD_REQUEST,
324 Json(json!({
325 "error": "InvalidSigningKey",
326 "message": "Signing key not found, already used, or expired"
327 })),
328 )
329 .into_response();
330 }
331 Err(e) => {
332 error!("Error looking up reserved signing key: {:?}", e);
333 return (
334 StatusCode::INTERNAL_SERVER_ERROR,
335 Json(json!({"error": "InternalError"})),
336 )
337 .into_response();
338 }
339 }
340 } else {
341 let secret_key = SecretKey::random(&mut OsRng);
342 (secret_key.to_bytes().to_vec(), None)
343 };
344 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
345 Ok(k) => k,
346 Err(e) => {
347 error!("Error creating signing key: {:?}", e);
348 return (
349 StatusCode::INTERNAL_SERVER_ERROR,
350 Json(json!({"error": "InternalError"})),
351 )
352 .into_response();
353 }
354 };
355 let did_type = input.did_type.as_deref().unwrap_or("plc");
356 let did = match did_type {
357 "web" => {
358 if !crate::api::server::meta::is_self_hosted_did_web_enabled() {
359 return (
360 StatusCode::BAD_REQUEST,
361 Json(json!({
362 "error": "SelfHostedDidWebDisabled",
363 "message": "This PDS does not offer self-hosted did:web identities. Please use did:plc or bring your own did:web."
364 })),
365 )
366 .into_response();
367 }
368 let subdomain_host = format!("{}.{}", input.handle, hostname);
369 let encoded_subdomain = subdomain_host.replace(':', "%3A");
370 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
371 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
372 self_hosted_did
373 }
374 "web-external" => {
375 let d = match &input.did {
376 Some(d) if !d.trim().is_empty() => d,
377 _ => {
378 return (
379 StatusCode::BAD_REQUEST,
380 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
381 )
382 .into_response();
383 }
384 };
385 if !d.starts_with("did:web:") {
386 return (
387 StatusCode::BAD_REQUEST,
388 Json(
389 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
390 ),
391 )
392 .into_response();
393 }
394 if !is_did_web_byod
395 && let Err(e) =
396 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
397 {
398 return (
399 StatusCode::BAD_REQUEST,
400 Json(json!({"error": "InvalidDid", "message": e})),
401 )
402 .into_response();
403 }
404 info!(did = %d, "Creating external did:web account");
405 d.clone()
406 }
407 _ => {
408 if let Some(d) = &input.did {
409 if d.starts_with("did:plc:") && is_migration {
410 info!(did = %d, "Migration with existing did:plc");
411 d.clone()
412 } else if d.starts_with("did:web:") {
413 if !is_did_web_byod
414 && let Err(e) = verify_did_web(
415 d,
416 &hostname,
417 &input.handle,
418 input.signing_key.as_deref(),
419 )
420 .await
421 {
422 return (
423 StatusCode::BAD_REQUEST,
424 Json(json!({"error": "InvalidDid", "message": e})),
425 )
426 .into_response();
427 }
428 d.clone()
429 } else if !d.trim().is_empty() {
430 return (
431 StatusCode::BAD_REQUEST,
432 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
433 )
434 .into_response();
435 } else {
436 let rotation_key = std::env::var("PLC_ROTATION_KEY")
437 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
438 let genesis_result = match create_genesis_operation(
439 &signing_key,
440 &rotation_key,
441 &handle,
442 &pds_endpoint,
443 ) {
444 Ok(r) => r,
445 Err(e) => {
446 error!("Error creating PLC genesis operation: {:?}", e);
447 return (
448 StatusCode::INTERNAL_SERVER_ERROR,
449 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
450 )
451 .into_response();
452 }
453 };
454 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
455 if let Err(e) = plc_client
456 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
457 .await
458 {
459 error!("Failed to submit PLC genesis operation: {:?}", e);
460 return (
461 StatusCode::BAD_GATEWAY,
462 Json(json!({
463 "error": "UpstreamError",
464 "message": format!("Failed to register DID with PLC directory: {}", e)
465 })),
466 )
467 .into_response();
468 }
469 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
470 genesis_result.did
471 }
472 } else {
473 let rotation_key = std::env::var("PLC_ROTATION_KEY")
474 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
475 let genesis_result = match create_genesis_operation(
476 &signing_key,
477 &rotation_key,
478 &handle,
479 &pds_endpoint,
480 ) {
481 Ok(r) => r,
482 Err(e) => {
483 error!("Error creating PLC genesis operation: {:?}", e);
484 return (
485 StatusCode::INTERNAL_SERVER_ERROR,
486 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
487 )
488 .into_response();
489 }
490 };
491 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
492 if let Err(e) = plc_client
493 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
494 .await
495 {
496 error!("Failed to submit PLC genesis operation: {:?}", e);
497 return (
498 StatusCode::BAD_GATEWAY,
499 Json(json!({
500 "error": "UpstreamError",
501 "message": format!("Failed to register DID with PLC directory: {}", e)
502 })),
503 )
504 .into_response();
505 }
506 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
507 genesis_result.did
508 }
509 }
510 };
511 let mut tx = match state.db.begin().await {
512 Ok(tx) => tx,
513 Err(e) => {
514 error!("Error starting transaction: {:?}", e);
515 return (
516 StatusCode::INTERNAL_SERVER_ERROR,
517 Json(json!({"error": "InternalError"})),
518 )
519 .into_response();
520 }
521 };
522 if is_migration {
523 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
524 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
525 .bind(&did)
526 .fetch_optional(&mut *tx)
527 .await
528 .unwrap_or(None);
529 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
530 if deactivated_at.is_some() {
531 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
532 let update_result: Result<_, sqlx::Error> =
533 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
534 .bind(&handle)
535 .bind(account_id)
536 .execute(&mut *tx)
537 .await;
538 if let Err(e) = update_result {
539 if let Some(db_err) = e.as_database_error()
540 && db_err
541 .constraint()
542 .map(|c| c.contains("handle"))
543 .unwrap_or(false)
544 {
545 return (
546 StatusCode::BAD_REQUEST,
547 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
548 )
549 .into_response();
550 }
551 error!("Error reactivating account: {:?}", e);
552 return (
553 StatusCode::INTERNAL_SERVER_ERROR,
554 Json(json!({"error": "InternalError"})),
555 )
556 .into_response();
557 }
558 if let Err(e) = tx.commit().await {
559 error!("Error committing reactivation: {:?}", e);
560 return (
561 StatusCode::INTERNAL_SERVER_ERROR,
562 Json(json!({"error": "InternalError"})),
563 )
564 .into_response();
565 }
566 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
567 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
568 )
569 .bind(account_id)
570 .fetch_optional(&state.db)
571 .await
572 .unwrap_or(None);
573 let secret_key_bytes = match key_row {
574 Some((key_bytes, encryption_version)) => {
575 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
576 Ok(k) => k,
577 Err(e) => {
578 error!("Error decrypting key for reactivated account: {:?}", e);
579 return (
580 StatusCode::INTERNAL_SERVER_ERROR,
581 Json(json!({"error": "InternalError"})),
582 )
583 .into_response();
584 }
585 }
586 }
587 None => {
588 error!("No signing key found for reactivated account");
589 return (
590 StatusCode::INTERNAL_SERVER_ERROR,
591 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
592 )
593 .into_response();
594 }
595 };
596 let access_meta =
597 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
598 Ok(m) => m,
599 Err(e) => {
600 error!("Error creating access token: {:?}", e);
601 return (
602 StatusCode::INTERNAL_SERVER_ERROR,
603 Json(json!({"error": "InternalError"})),
604 )
605 .into_response();
606 }
607 };
608 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
609 &did,
610 &secret_key_bytes,
611 ) {
612 Ok(m) => m,
613 Err(e) => {
614 error!("Error creating refresh token: {:?}", e);
615 return (
616 StatusCode::INTERNAL_SERVER_ERROR,
617 Json(json!({"error": "InternalError"})),
618 )
619 .into_response();
620 }
621 };
622 let session_result: Result<_, sqlx::Error> = sqlx::query(
623 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
624 )
625 .bind(&did)
626 .bind(&access_meta.jti)
627 .bind(&refresh_meta.jti)
628 .bind(access_meta.expires_at)
629 .bind(refresh_meta.expires_at)
630 .execute(&state.db)
631 .await;
632 if let Err(e) = session_result {
633 error!("Error creating session: {:?}", e);
634 return (
635 StatusCode::INTERNAL_SERVER_ERROR,
636 Json(json!({"error": "InternalError"})),
637 )
638 .into_response();
639 }
640 return (
641 StatusCode::OK,
642 Json(CreateAccountOutput {
643 handle: handle.clone(),
644 did: did.clone(),
645 did_doc: state.did_resolver.resolve_did_document(&did).await,
646 access_jwt: access_meta.token,
647 refresh_jwt: refresh_meta.token,
648 verification_required: false,
649 verification_channel: "email".to_string(),
650 }),
651 )
652 .into_response();
653 } else {
654 return (
655 StatusCode::BAD_REQUEST,
656 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
657 )
658 .into_response();
659 }
660 }
661 }
662 let exists_result: Option<(i32,)> =
663 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
664 .bind(&handle)
665 .fetch_optional(&mut *tx)
666 .await
667 .unwrap_or(None);
668 if exists_result.is_some() {
669 return (
670 StatusCode::BAD_REQUEST,
671 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
672 )
673 .into_response();
674 }
675 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
676 .map(|v| v == "true" || v == "1")
677 .unwrap_or(false);
678 if invite_code_required
679 && input
680 .invite_code
681 .as_ref()
682 .map(|c| c.trim().is_empty())
683 .unwrap_or(true)
684 {
685 return (
686 StatusCode::BAD_REQUEST,
687 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
688 )
689 .into_response();
690 }
691 if let Some(code) = &input.invite_code
692 && !code.trim().is_empty()
693 {
694 let invite_query = sqlx::query!(
695 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
696 code
697 )
698 .fetch_optional(&mut *tx)
699 .await;
700 match invite_query {
701 Ok(Some(row)) => {
702 if row.available_uses <= 0 {
703 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
704 }
705 let update_invite = sqlx::query!(
706 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
707 code
708 )
709 .execute(&mut *tx)
710 .await;
711 if let Err(e) = update_invite {
712 error!("Error updating invite code: {:?}", e);
713 return (
714 StatusCode::INTERNAL_SERVER_ERROR,
715 Json(json!({"error": "InternalError"})),
716 )
717 .into_response();
718 }
719 }
720 Ok(None) => {
721 return (
722 StatusCode::BAD_REQUEST,
723 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
724 )
725 .into_response();
726 }
727 Err(e) => {
728 error!("Error checking invite code: {:?}", e);
729 return (
730 StatusCode::INTERNAL_SERVER_ERROR,
731 Json(json!({"error": "InternalError"})),
732 )
733 .into_response();
734 }
735 }
736 }
737 if let Err(e) = validate_password(&input.password) {
738 return (
739 StatusCode::BAD_REQUEST,
740 Json(json!({
741 "error": "InvalidPassword",
742 "message": e.to_string()
743 })),
744 )
745 .into_response();
746 }
747
748 let password_clone = input.password.clone();
749 let password_hash =
750 match tokio::task::spawn_blocking(move || hash(&password_clone, DEFAULT_COST)).await {
751 Ok(Ok(h)) => h,
752 Ok(Err(e)) => {
753 error!("Error hashing password: {:?}", e);
754 return (
755 StatusCode::INTERNAL_SERVER_ERROR,
756 Json(json!({"error": "InternalError"})),
757 )
758 .into_response();
759 }
760 Err(e) => {
761 error!("Failed to spawn blocking task: {:?}", e);
762 return (
763 StatusCode::INTERNAL_SERVER_ERROR,
764 Json(json!({"error": "InternalError"})),
765 )
766 .into_response();
767 }
768 };
769 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
770 .fetch_one(&mut *tx)
771 .await
772 .map(|c| c.unwrap_or(0) == 0)
773 .unwrap_or(false);
774 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
775 Some(chrono::Utc::now())
776 } else {
777 None
778 };
779 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
780 r#"INSERT INTO users (
781 handle, email, did, password_hash,
782 preferred_comms_channel,
783 discord_id, telegram_username, signal_number,
784 is_admin, deactivated_at, email_verified
785 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
786 )
787 .bind(&handle)
788 .bind(&email)
789 .bind(&did)
790 .bind(&password_hash)
791 .bind(verification_channel)
792 .bind(
793 input
794 .discord_id
795 .as_deref()
796 .map(|s| s.trim())
797 .filter(|s| !s.is_empty()),
798 )
799 .bind(
800 input
801 .telegram_username
802 .as_deref()
803 .map(|s| s.trim())
804 .filter(|s| !s.is_empty()),
805 )
806 .bind(
807 input
808 .signal_number
809 .as_deref()
810 .map(|s| s.trim())
811 .filter(|s| !s.is_empty()),
812 )
813 .bind(is_first_user)
814 .bind(deactivated_at)
815 .bind(false)
816 .fetch_one(&mut *tx)
817 .await;
818 let user_id = match user_insert {
819 Ok((id,)) => id,
820 Err(e) => {
821 if let Some(db_err) = e.as_database_error()
822 && db_err.code().as_deref() == Some("23505")
823 {
824 let constraint = db_err.constraint().unwrap_or("");
825 if constraint.contains("handle") || constraint.contains("users_handle") {
826 return (
827 StatusCode::BAD_REQUEST,
828 Json(json!({
829 "error": "HandleNotAvailable",
830 "message": "Handle already taken"
831 })),
832 )
833 .into_response();
834 } else if constraint.contains("email") || constraint.contains("users_email") {
835 return (
836 StatusCode::BAD_REQUEST,
837 Json(json!({
838 "error": "InvalidEmail",
839 "message": "Email already registered"
840 })),
841 )
842 .into_response();
843 } else if constraint.contains("did") || constraint.contains("users_did") {
844 return (
845 StatusCode::BAD_REQUEST,
846 Json(json!({
847 "error": "AccountAlreadyExists",
848 "message": "An account with this DID already exists"
849 })),
850 )
851 .into_response();
852 }
853 }
854 error!("Error inserting user: {:?}", e);
855 return (
856 StatusCode::INTERNAL_SERVER_ERROR,
857 Json(json!({"error": "InternalError"})),
858 )
859 .into_response();
860 }
861 };
862
863 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
864 Ok(enc) => enc,
865 Err(e) => {
866 error!("Error encrypting user key: {:?}", e);
867 return (
868 StatusCode::INTERNAL_SERVER_ERROR,
869 Json(json!({"error": "InternalError"})),
870 )
871 .into_response();
872 }
873 };
874 let key_insert = sqlx::query!(
875 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
876 user_id,
877 &encrypted_key_bytes[..],
878 crate::config::ENCRYPTION_VERSION
879 )
880 .execute(&mut *tx)
881 .await;
882 if let Err(e) = key_insert {
883 error!("Error inserting user key: {:?}", e);
884 return (
885 StatusCode::INTERNAL_SERVER_ERROR,
886 Json(json!({"error": "InternalError"})),
887 )
888 .into_response();
889 }
890 if let Some(key_id) = reserved_key_id {
891 let mark_used = sqlx::query!(
892 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
893 key_id
894 )
895 .execute(&mut *tx)
896 .await;
897 if let Err(e) = mark_used {
898 error!("Error marking reserved key as used: {:?}", e);
899 return (
900 StatusCode::INTERNAL_SERVER_ERROR,
901 Json(json!({"error": "InternalError"})),
902 )
903 .into_response();
904 }
905 }
906 let mst = Mst::new(Arc::new(state.block_store.clone()));
907 let mst_root = match mst.persist().await {
908 Ok(c) => c,
909 Err(e) => {
910 error!("Error persisting MST: {:?}", e);
911 return (
912 StatusCode::INTERNAL_SERVER_ERROR,
913 Json(json!({"error": "InternalError"})),
914 )
915 .into_response();
916 }
917 };
918 let rev = Tid::now(LimitedU32::MIN);
919 let (commit_bytes, _sig) =
920 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
921 Ok(result) => result,
922 Err(e) => {
923 error!("Error creating genesis commit: {:?}", e);
924 return (
925 StatusCode::INTERNAL_SERVER_ERROR,
926 Json(json!({"error": "InternalError"})),
927 )
928 .into_response();
929 }
930 };
931 let commit_cid = match state.block_store.put(&commit_bytes).await {
932 Ok(c) => c,
933 Err(e) => {
934 error!("Error saving genesis commit: {:?}", e);
935 return (
936 StatusCode::INTERNAL_SERVER_ERROR,
937 Json(json!({"error": "InternalError"})),
938 )
939 .into_response();
940 }
941 };
942 let commit_cid_str = commit_cid.to_string();
943 let rev_str = rev.as_ref().to_string();
944 let repo_insert = sqlx::query!(
945 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
946 user_id,
947 commit_cid_str,
948 rev_str
949 )
950 .execute(&mut *tx)
951 .await;
952 if let Err(e) = repo_insert {
953 error!("Error initializing repo: {:?}", e);
954 return (
955 StatusCode::INTERNAL_SERVER_ERROR,
956 Json(json!({"error": "InternalError"})),
957 )
958 .into_response();
959 }
960 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
961 if let Err(e) = sqlx::query!(
962 r#"
963 INSERT INTO user_blocks (user_id, block_cid)
964 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
965 ON CONFLICT (user_id, block_cid) DO NOTHING
966 "#,
967 user_id,
968 &genesis_block_cids
969 )
970 .execute(&mut *tx)
971 .await
972 {
973 error!("Error inserting user_blocks: {:?}", e);
974 return (
975 StatusCode::INTERNAL_SERVER_ERROR,
976 Json(json!({"error": "InternalError"})),
977 )
978 .into_response();
979 }
980 if let Some(code) = &input.invite_code
981 && !code.trim().is_empty()
982 {
983 let use_insert = sqlx::query!(
984 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
985 code,
986 user_id
987 )
988 .execute(&mut *tx)
989 .await;
990 if let Err(e) = use_insert {
991 error!("Error recording invite usage: {:?}", e);
992 return (
993 StatusCode::INTERNAL_SERVER_ERROR,
994 Json(json!({"error": "InternalError"})),
995 )
996 .into_response();
997 }
998 }
999 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() {
1000 let birthdate_pref = json!({
1001 "$type": "app.bsky.actor.defs#personalDetailsPref",
1002 "birthDate": "1998-05-06T00:00:00.000Z"
1003 });
1004 if let Err(e) = sqlx::query!(
1005 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)
1006 ON CONFLICT (user_id, name) DO NOTHING",
1007 user_id,
1008 "app.bsky.actor.defs#personalDetailsPref",
1009 birthdate_pref
1010 )
1011 .execute(&mut *tx)
1012 .await
1013 {
1014 warn!("Failed to set default birthdate preference: {:?}", e);
1015 }
1016 }
1017 if let Err(e) = tx.commit().await {
1018 error!("Error committing transaction: {:?}", e);
1019 return (
1020 StatusCode::INTERNAL_SERVER_ERROR,
1021 Json(json!({"error": "InternalError"})),
1022 )
1023 .into_response();
1024 }
1025 if !is_migration && !is_did_web_byod {
1026 if let Err(e) =
1027 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
1028 {
1029 warn!("Failed to sequence identity event for {}: {}", did, e);
1030 }
1031 if let Err(e) =
1032 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
1033 {
1034 warn!("Failed to sequence account event for {}: {}", did, e);
1035 }
1036 if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
1037 &state,
1038 &did,
1039 &commit_cid,
1040 &mst_root,
1041 &rev_str,
1042 )
1043 .await
1044 {
1045 warn!("Failed to sequence commit event for {}: {}", did, e);
1046 }
1047 if let Err(e) = crate::api::repo::record::sequence_sync_event(
1048 &state,
1049 &did,
1050 &commit_cid_str,
1051 Some(rev.as_ref()),
1052 )
1053 .await
1054 {
1055 warn!("Failed to sequence sync event for {}: {}", did, e);
1056 }
1057 let profile_record = json!({
1058 "$type": "app.bsky.actor.profile",
1059 "displayName": input.handle
1060 });
1061 if let Err(e) = crate::api::repo::record::create_record_internal(
1062 &state,
1063 &did,
1064 "app.bsky.actor.profile",
1065 "self",
1066 &profile_record,
1067 )
1068 .await
1069 {
1070 warn!("Failed to create default profile for {}: {}", did, e);
1071 }
1072 }
1073 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
1074 if !is_migration {
1075 if let Some(ref recipient) = verification_recipient {
1076 let verification_token = crate::auth::verification_token::generate_signup_token(
1077 &did,
1078 verification_channel,
1079 recipient,
1080 );
1081 let formatted_token =
1082 crate::auth::verification_token::format_token_for_display(&verification_token);
1083 if let Err(e) = crate::comms::enqueue_signup_verification(
1084 &state.db,
1085 user_id,
1086 verification_channel,
1087 recipient,
1088 &formatted_token,
1089 None,
1090 )
1091 .await
1092 {
1093 warn!(
1094 "Failed to enqueue signup verification notification: {:?}",
1095 e
1096 );
1097 }
1098 }
1099 } else if let Some(ref user_email) = email {
1100 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
1101 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
1102 if let Err(e) = crate::comms::enqueue_migration_verification(
1103 &state.db,
1104 user_id,
1105 user_email,
1106 &formatted_token,
1107 &hostname,
1108 )
1109 .await
1110 {
1111 warn!("Failed to enqueue migration verification email: {:?}", e);
1112 }
1113 }
1114
1115 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes)
1116 {
1117 Ok(m) => m,
1118 Err(e) => {
1119 error!("createAccount: Error creating access token: {:?}", e);
1120 return (
1121 StatusCode::INTERNAL_SERVER_ERROR,
1122 Json(json!({"error": "InternalError"})),
1123 )
1124 .into_response();
1125 }
1126 };
1127 let refresh_meta =
1128 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1129 Ok(m) => m,
1130 Err(e) => {
1131 error!("createAccount: Error creating refresh token: {:?}", e);
1132 return (
1133 StatusCode::INTERNAL_SERVER_ERROR,
1134 Json(json!({"error": "InternalError"})),
1135 )
1136 .into_response();
1137 }
1138 };
1139 if let Err(e) = sqlx::query!(
1140 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1141 did,
1142 access_meta.jti,
1143 refresh_meta.jti,
1144 access_meta.expires_at,
1145 refresh_meta.expires_at
1146 )
1147 .execute(&state.db)
1148 .await
1149 {
1150 error!("createAccount: Error creating session: {:?}", e);
1151 return (
1152 StatusCode::INTERNAL_SERVER_ERROR,
1153 Json(json!({"error": "InternalError"})),
1154 )
1155 .into_response();
1156 }
1157
1158 let did_doc = state.did_resolver.resolve_did_document(&did).await;
1159
1160 if is_migration {
1161 info!(
1162 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}",
1163 did, handle
1164 );
1165 }
1166
1167 (
1168 StatusCode::OK,
1169 Json(CreateAccountOutput {
1170 handle: handle.clone(),
1171 did,
1172 did_doc,
1173 access_jwt: access_meta.token,
1174 refresh_jwt: refresh_meta.token,
1175 verification_required: !is_migration,
1176 verification_channel: verification_channel.to_string(),
1177 }),
1178 )
1179 .into_response()
1180}