this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub access_jwt: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub refresh_jwt: Option<String>,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 let is_potential_migration = input
73 .did
74 .as_ref()
75 .map(|d| d.starts_with("did:plc:"))
76 .unwrap_or(false);
77 if is_potential_migration {
78 info!(
79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}",
80 input.did, input.handle
81 );
82 } else {
83 info!("create_account called");
84 }
85 let client_ip = extract_client_ip(&headers);
86 if !state
87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
88 .await
89 {
90 warn!(ip = %client_ip, "Account creation rate limit exceeded");
91 return (
92 StatusCode::TOO_MANY_REQUESTS,
93 Json(json!({
94 "error": "RateLimitExceeded",
95 "message": "Too many account creation attempts. Please try again later."
96 })),
97 )
98 .into_response();
99 }
100
101 let migration_auth = if let Some(token) =
102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
103 {
104 if is_service_token(&token) {
105 let verifier = ServiceTokenVerifier::new();
106 match verifier
107 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
108 .await
109 {
110 Ok(claims) => {
111 debug!("Service token verified for migration: iss={}", claims.iss);
112 Some(claims.iss)
113 }
114 Err(e) => {
115 error!("Service token verification failed: {:?}", e);
116 return (
117 StatusCode::UNAUTHORIZED,
118 Json(json!({
119 "error": "AuthenticationFailed",
120 "message": format!("Service token verification failed: {}", e)
121 })),
122 )
123 .into_response();
124 }
125 }
126 } else {
127 None
128 }
129 } else {
130 None
131 };
132
133 let is_did_web_byod = migration_auth.is_some()
134 && input
135 .did
136 .as_ref()
137 .map(|d| d.starts_with("did:web:"))
138 .unwrap_or(false);
139
140 let is_migration = migration_auth.is_some()
141 && input
142 .did
143 .as_ref()
144 .map(|d| d.starts_with("did:plc:"))
145 .unwrap_or(false);
146
147 if (is_migration || is_did_web_byod)
148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
149 {
150 if provided_did != auth_did {
151 info!(
152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}",
153 auth_did, provided_did
154 );
155 return (
156 StatusCode::FORBIDDEN,
157 Json(json!({
158 "error": "AuthorizationError",
159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did)
160 })),
161 )
162 .into_response();
163 }
164 if is_did_web_byod {
165 info!(did = %provided_did, "Processing did:web BYOD account creation");
166 } else {
167 info!(
168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}",
169 provided_did
170 );
171 }
172 }
173
174 let hostname_for_validation =
175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
176 let pds_suffix = format!(".{}", hostname_for_validation);
177
178 let validated_short_handle = if !input.handle.contains('.')
179 || input.handle.ends_with(&pds_suffix)
180 {
181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
182 input
183 .handle
184 .strip_suffix(&pds_suffix)
185 .unwrap_or(&input.handle)
186 } else {
187 &input.handle
188 };
189 match crate::api::validation::validate_short_handle(handle_to_validate) {
190 Ok(h) => h,
191 Err(e) => {
192 return (
193 StatusCode::BAD_REQUEST,
194 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
195 )
196 .into_response();
197 }
198 }
199 } else {
200 if input.handle.contains(' ') || input.handle.contains('\t') {
201 return (
202 StatusCode::BAD_REQUEST,
203 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
204 )
205 .into_response();
206 }
207 for c in input.handle.chars() {
208 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
209 return (
210 StatusCode::BAD_REQUEST,
211 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
212 )
213 .into_response();
214 }
215 }
216 let handle_lower = input.handle.to_lowercase();
217 if crate::moderation::has_explicit_slur(&handle_lower) {
218 return (
219 StatusCode::BAD_REQUEST,
220 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})),
221 )
222 .into_response();
223 }
224 handle_lower
225 };
226 let email: Option<String> = input
227 .email
228 .as_ref()
229 .map(|e| e.trim().to_string())
230 .filter(|e| !e.is_empty());
231 if let Some(ref email) = email
232 && !crate::api::validation::is_valid_email(email)
233 {
234 return (
235 StatusCode::BAD_REQUEST,
236 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
237 )
238 .into_response();
239 }
240 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
241 let valid_channels = ["email", "discord", "telegram", "signal"];
242 if !valid_channels.contains(&verification_channel) && !is_migration {
243 return (
244 StatusCode::BAD_REQUEST,
245 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
246 )
247 .into_response();
248 }
249 let verification_recipient = if is_migration {
250 None
251 } else {
252 Some(match verification_channel {
253 "email" => match &input.email {
254 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
255 _ => return (
256 StatusCode::BAD_REQUEST,
257 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
258 ).into_response(),
259 },
260 "discord" => match &input.discord_id {
261 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
262 _ => return (
263 StatusCode::BAD_REQUEST,
264 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
265 ).into_response(),
266 },
267 "telegram" => match &input.telegram_username {
268 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
269 _ => return (
270 StatusCode::BAD_REQUEST,
271 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
272 ).into_response(),
273 },
274 "signal" => match &input.signal_number {
275 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
276 _ => return (
277 StatusCode::BAD_REQUEST,
278 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
279 ).into_response(),
280 },
281 _ => return (
282 StatusCode::BAD_REQUEST,
283 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
284 ).into_response(),
285 })
286 };
287 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
288 let pds_endpoint = format!("https://{}", hostname);
289 let suffix = format!(".{}", hostname);
290 let handle = if input.handle.ends_with(&suffix) {
291 format!("{}.{}", validated_short_handle, hostname)
292 } else if input.handle.contains('.') {
293 validated_short_handle.clone()
294 } else {
295 format!("{}.{}", validated_short_handle, hostname)
296 };
297 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
298 if let Some(signing_key_did) = &input.signing_key {
299 let reserved = sqlx::query!(
300 r#"
301 SELECT id, private_key_bytes
302 FROM reserved_signing_keys
303 WHERE public_key_did_key = $1
304 AND used_at IS NULL
305 AND expires_at > NOW()
306 FOR UPDATE
307 "#,
308 signing_key_did
309 )
310 .fetch_optional(&state.db)
311 .await;
312 match reserved {
313 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
314 Ok(None) => {
315 return (
316 StatusCode::BAD_REQUEST,
317 Json(json!({
318 "error": "InvalidSigningKey",
319 "message": "Signing key not found, already used, or expired"
320 })),
321 )
322 .into_response();
323 }
324 Err(e) => {
325 error!("Error looking up reserved signing key: {:?}", e);
326 return (
327 StatusCode::INTERNAL_SERVER_ERROR,
328 Json(json!({"error": "InternalError"})),
329 )
330 .into_response();
331 }
332 }
333 } else {
334 let secret_key = SecretKey::random(&mut OsRng);
335 (secret_key.to_bytes().to_vec(), None)
336 };
337 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
338 Ok(k) => k,
339 Err(e) => {
340 error!("Error creating signing key: {:?}", e);
341 return (
342 StatusCode::INTERNAL_SERVER_ERROR,
343 Json(json!({"error": "InternalError"})),
344 )
345 .into_response();
346 }
347 };
348 let did_type = input.did_type.as_deref().unwrap_or("plc");
349 let did = match did_type {
350 "web" => {
351 let subdomain_host = format!("{}.{}", input.handle, hostname);
352 let encoded_subdomain = subdomain_host.replace(':', "%3A");
353 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
354 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
355 self_hosted_did
356 }
357 "web-external" => {
358 let d = match &input.did {
359 Some(d) if !d.trim().is_empty() => d,
360 _ => {
361 return (
362 StatusCode::BAD_REQUEST,
363 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
364 )
365 .into_response();
366 }
367 };
368 if !d.starts_with("did:web:") {
369 return (
370 StatusCode::BAD_REQUEST,
371 Json(
372 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
373 ),
374 )
375 .into_response();
376 }
377 if !is_did_web_byod
378 && let Err(e) =
379 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
380 {
381 return (
382 StatusCode::BAD_REQUEST,
383 Json(json!({"error": "InvalidDid", "message": e})),
384 )
385 .into_response();
386 }
387 info!(did = %d, "Creating external did:web account");
388 d.clone()
389 }
390 _ => {
391 if let Some(d) = &input.did {
392 if d.starts_with("did:plc:") && is_migration {
393 info!(did = %d, "Migration with existing did:plc");
394 d.clone()
395 } else if d.starts_with("did:web:") {
396 if !is_did_web_byod
397 && let Err(e) = verify_did_web(
398 d,
399 &hostname,
400 &input.handle,
401 input.signing_key.as_deref(),
402 )
403 .await
404 {
405 return (
406 StatusCode::BAD_REQUEST,
407 Json(json!({"error": "InvalidDid", "message": e})),
408 )
409 .into_response();
410 }
411 d.clone()
412 } else if !d.trim().is_empty() {
413 return (
414 StatusCode::BAD_REQUEST,
415 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
416 )
417 .into_response();
418 } else {
419 let rotation_key = std::env::var("PLC_ROTATION_KEY")
420 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
421 let genesis_result = match create_genesis_operation(
422 &signing_key,
423 &rotation_key,
424 &handle,
425 &pds_endpoint,
426 ) {
427 Ok(r) => r,
428 Err(e) => {
429 error!("Error creating PLC genesis operation: {:?}", e);
430 return (
431 StatusCode::INTERNAL_SERVER_ERROR,
432 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
433 )
434 .into_response();
435 }
436 };
437 let plc_client = PlcClient::new(None);
438 if let Err(e) = plc_client
439 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
440 .await
441 {
442 error!("Failed to submit PLC genesis operation: {:?}", e);
443 return (
444 StatusCode::BAD_GATEWAY,
445 Json(json!({
446 "error": "UpstreamError",
447 "message": format!("Failed to register DID with PLC directory: {}", e)
448 })),
449 )
450 .into_response();
451 }
452 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
453 genesis_result.did
454 }
455 } else {
456 let rotation_key = std::env::var("PLC_ROTATION_KEY")
457 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
458 let genesis_result = match create_genesis_operation(
459 &signing_key,
460 &rotation_key,
461 &handle,
462 &pds_endpoint,
463 ) {
464 Ok(r) => r,
465 Err(e) => {
466 error!("Error creating PLC genesis operation: {:?}", e);
467 return (
468 StatusCode::INTERNAL_SERVER_ERROR,
469 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
470 )
471 .into_response();
472 }
473 };
474 let plc_client = PlcClient::new(None);
475 if let Err(e) = plc_client
476 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
477 .await
478 {
479 error!("Failed to submit PLC genesis operation: {:?}", e);
480 return (
481 StatusCode::BAD_GATEWAY,
482 Json(json!({
483 "error": "UpstreamError",
484 "message": format!("Failed to register DID with PLC directory: {}", e)
485 })),
486 )
487 .into_response();
488 }
489 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
490 genesis_result.did
491 }
492 }
493 };
494 let mut tx = match state.db.begin().await {
495 Ok(tx) => tx,
496 Err(e) => {
497 error!("Error starting transaction: {:?}", e);
498 return (
499 StatusCode::INTERNAL_SERVER_ERROR,
500 Json(json!({"error": "InternalError"})),
501 )
502 .into_response();
503 }
504 };
505 if is_migration {
506 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
507 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
508 .bind(&did)
509 .fetch_optional(&mut *tx)
510 .await
511 .unwrap_or(None);
512 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
513 if deactivated_at.is_some() {
514 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
515 let update_result: Result<_, sqlx::Error> =
516 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
517 .bind(&handle)
518 .bind(account_id)
519 .execute(&mut *tx)
520 .await;
521 if let Err(e) = update_result {
522 if let Some(db_err) = e.as_database_error()
523 && db_err
524 .constraint()
525 .map(|c| c.contains("handle"))
526 .unwrap_or(false)
527 {
528 return (
529 StatusCode::BAD_REQUEST,
530 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
531 )
532 .into_response();
533 }
534 error!("Error reactivating account: {:?}", e);
535 return (
536 StatusCode::INTERNAL_SERVER_ERROR,
537 Json(json!({"error": "InternalError"})),
538 )
539 .into_response();
540 }
541 if let Err(e) = tx.commit().await {
542 error!("Error committing reactivation: {:?}", e);
543 return (
544 StatusCode::INTERNAL_SERVER_ERROR,
545 Json(json!({"error": "InternalError"})),
546 )
547 .into_response();
548 }
549 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
550 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
551 )
552 .bind(account_id)
553 .fetch_optional(&state.db)
554 .await
555 .unwrap_or(None);
556 let secret_key_bytes = match key_row {
557 Some((key_bytes, encryption_version)) => {
558 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
559 Ok(k) => k,
560 Err(e) => {
561 error!("Error decrypting key for reactivated account: {:?}", e);
562 return (
563 StatusCode::INTERNAL_SERVER_ERROR,
564 Json(json!({"error": "InternalError"})),
565 )
566 .into_response();
567 }
568 }
569 }
570 None => {
571 error!("No signing key found for reactivated account");
572 return (
573 StatusCode::INTERNAL_SERVER_ERROR,
574 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
575 )
576 .into_response();
577 }
578 };
579 let access_meta =
580 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
581 Ok(m) => m,
582 Err(e) => {
583 error!("Error creating access token: {:?}", e);
584 return (
585 StatusCode::INTERNAL_SERVER_ERROR,
586 Json(json!({"error": "InternalError"})),
587 )
588 .into_response();
589 }
590 };
591 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
592 &did,
593 &secret_key_bytes,
594 ) {
595 Ok(m) => m,
596 Err(e) => {
597 error!("Error creating refresh token: {:?}", e);
598 return (
599 StatusCode::INTERNAL_SERVER_ERROR,
600 Json(json!({"error": "InternalError"})),
601 )
602 .into_response();
603 }
604 };
605 let session_result: Result<_, sqlx::Error> = sqlx::query(
606 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
607 )
608 .bind(&did)
609 .bind(&access_meta.jti)
610 .bind(&refresh_meta.jti)
611 .bind(access_meta.expires_at)
612 .bind(refresh_meta.expires_at)
613 .execute(&state.db)
614 .await;
615 if let Err(e) = session_result {
616 error!("Error creating session: {:?}", e);
617 return (
618 StatusCode::INTERNAL_SERVER_ERROR,
619 Json(json!({"error": "InternalError"})),
620 )
621 .into_response();
622 }
623 return (
624 StatusCode::OK,
625 Json(CreateAccountOutput {
626 handle: handle.clone(),
627 did,
628 access_jwt: Some(access_meta.token),
629 refresh_jwt: Some(refresh_meta.token),
630 verification_required: false,
631 verification_channel: "email".to_string(),
632 }),
633 )
634 .into_response();
635 } else {
636 return (
637 StatusCode::BAD_REQUEST,
638 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
639 )
640 .into_response();
641 }
642 }
643 }
644 let exists_result: Option<(i32,)> =
645 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
646 .bind(&handle)
647 .fetch_optional(&mut *tx)
648 .await
649 .unwrap_or(None);
650 if exists_result.is_some() {
651 return (
652 StatusCode::BAD_REQUEST,
653 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
654 )
655 .into_response();
656 }
657 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
658 .map(|v| v == "true" || v == "1")
659 .unwrap_or(false);
660 if invite_code_required
661 && input
662 .invite_code
663 .as_ref()
664 .map(|c| c.trim().is_empty())
665 .unwrap_or(true)
666 {
667 return (
668 StatusCode::BAD_REQUEST,
669 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
670 )
671 .into_response();
672 }
673 if let Some(code) = &input.invite_code
674 && !code.trim().is_empty()
675 {
676 let invite_query = sqlx::query!(
677 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
678 code
679 )
680 .fetch_optional(&mut *tx)
681 .await;
682 match invite_query {
683 Ok(Some(row)) => {
684 if row.available_uses <= 0 {
685 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
686 }
687 let update_invite = sqlx::query!(
688 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
689 code
690 )
691 .execute(&mut *tx)
692 .await;
693 if let Err(e) = update_invite {
694 error!("Error updating invite code: {:?}", e);
695 return (
696 StatusCode::INTERNAL_SERVER_ERROR,
697 Json(json!({"error": "InternalError"})),
698 )
699 .into_response();
700 }
701 }
702 Ok(None) => {
703 return (
704 StatusCode::BAD_REQUEST,
705 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
706 )
707 .into_response();
708 }
709 Err(e) => {
710 error!("Error checking invite code: {:?}", e);
711 return (
712 StatusCode::INTERNAL_SERVER_ERROR,
713 Json(json!({"error": "InternalError"})),
714 )
715 .into_response();
716 }
717 }
718 }
719 if let Err(e) = validate_password(&input.password) {
720 return (
721 StatusCode::BAD_REQUEST,
722 Json(json!({
723 "error": "InvalidPassword",
724 "message": e.to_string()
725 })),
726 )
727 .into_response();
728 }
729
730 let password_hash = match hash(&input.password, DEFAULT_COST) {
731 Ok(h) => h,
732 Err(e) => {
733 error!("Error hashing password: {:?}", e);
734 return (
735 StatusCode::INTERNAL_SERVER_ERROR,
736 Json(json!({"error": "InternalError"})),
737 )
738 .into_response();
739 }
740 };
741 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
742 .fetch_one(&mut *tx)
743 .await
744 .map(|c| c.unwrap_or(0) == 0)
745 .unwrap_or(false);
746 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
747 Some(chrono::Utc::now())
748 } else {
749 None
750 };
751 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
752 r#"INSERT INTO users (
753 handle, email, did, password_hash,
754 preferred_comms_channel,
755 discord_id, telegram_username, signal_number,
756 is_admin, deactivated_at, email_verified
757 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
758 )
759 .bind(&handle)
760 .bind(&email)
761 .bind(&did)
762 .bind(&password_hash)
763 .bind(verification_channel)
764 .bind(
765 input
766 .discord_id
767 .as_deref()
768 .map(|s| s.trim())
769 .filter(|s| !s.is_empty()),
770 )
771 .bind(
772 input
773 .telegram_username
774 .as_deref()
775 .map(|s| s.trim())
776 .filter(|s| !s.is_empty()),
777 )
778 .bind(
779 input
780 .signal_number
781 .as_deref()
782 .map(|s| s.trim())
783 .filter(|s| !s.is_empty()),
784 )
785 .bind(is_first_user)
786 .bind(deactivated_at)
787 .bind(false)
788 .fetch_one(&mut *tx)
789 .await;
790 let user_id = match user_insert {
791 Ok((id,)) => id,
792 Err(e) => {
793 if let Some(db_err) = e.as_database_error()
794 && db_err.code().as_deref() == Some("23505")
795 {
796 let constraint = db_err.constraint().unwrap_or("");
797 if constraint.contains("handle") || constraint.contains("users_handle") {
798 return (
799 StatusCode::BAD_REQUEST,
800 Json(json!({
801 "error": "HandleNotAvailable",
802 "message": "Handle already taken"
803 })),
804 )
805 .into_response();
806 } else if constraint.contains("email") || constraint.contains("users_email") {
807 return (
808 StatusCode::BAD_REQUEST,
809 Json(json!({
810 "error": "InvalidEmail",
811 "message": "Email already registered"
812 })),
813 )
814 .into_response();
815 } else if constraint.contains("did") || constraint.contains("users_did") {
816 return (
817 StatusCode::BAD_REQUEST,
818 Json(json!({
819 "error": "AccountAlreadyExists",
820 "message": "An account with this DID already exists"
821 })),
822 )
823 .into_response();
824 }
825 }
826 error!("Error inserting user: {:?}", e);
827 return (
828 StatusCode::INTERNAL_SERVER_ERROR,
829 Json(json!({"error": "InternalError"})),
830 )
831 .into_response();
832 }
833 };
834
835 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
836 Ok(enc) => enc,
837 Err(e) => {
838 error!("Error encrypting user key: {:?}", e);
839 return (
840 StatusCode::INTERNAL_SERVER_ERROR,
841 Json(json!({"error": "InternalError"})),
842 )
843 .into_response();
844 }
845 };
846 let key_insert = sqlx::query!(
847 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
848 user_id,
849 &encrypted_key_bytes[..],
850 crate::config::ENCRYPTION_VERSION
851 )
852 .execute(&mut *tx)
853 .await;
854 if let Err(e) = key_insert {
855 error!("Error inserting user key: {:?}", e);
856 return (
857 StatusCode::INTERNAL_SERVER_ERROR,
858 Json(json!({"error": "InternalError"})),
859 )
860 .into_response();
861 }
862 if let Some(key_id) = reserved_key_id {
863 let mark_used = sqlx::query!(
864 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
865 key_id
866 )
867 .execute(&mut *tx)
868 .await;
869 if let Err(e) = mark_used {
870 error!("Error marking reserved key as used: {:?}", e);
871 return (
872 StatusCode::INTERNAL_SERVER_ERROR,
873 Json(json!({"error": "InternalError"})),
874 )
875 .into_response();
876 }
877 }
878 let mst = Mst::new(Arc::new(state.block_store.clone()));
879 let mst_root = match mst.persist().await {
880 Ok(c) => c,
881 Err(e) => {
882 error!("Error persisting MST: {:?}", e);
883 return (
884 StatusCode::INTERNAL_SERVER_ERROR,
885 Json(json!({"error": "InternalError"})),
886 )
887 .into_response();
888 }
889 };
890 let rev = Tid::now(LimitedU32::MIN);
891 let (commit_bytes, _sig) =
892 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
893 Ok(result) => result,
894 Err(e) => {
895 error!("Error creating genesis commit: {:?}", e);
896 return (
897 StatusCode::INTERNAL_SERVER_ERROR,
898 Json(json!({"error": "InternalError"})),
899 )
900 .into_response();
901 }
902 };
903 let commit_cid = match state.block_store.put(&commit_bytes).await {
904 Ok(c) => c,
905 Err(e) => {
906 error!("Error saving genesis commit: {:?}", e);
907 return (
908 StatusCode::INTERNAL_SERVER_ERROR,
909 Json(json!({"error": "InternalError"})),
910 )
911 .into_response();
912 }
913 };
914 let commit_cid_str = commit_cid.to_string();
915 let repo_insert = sqlx::query!(
916 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
917 user_id,
918 commit_cid_str
919 )
920 .execute(&mut *tx)
921 .await;
922 if let Err(e) = repo_insert {
923 error!("Error initializing repo: {:?}", e);
924 return (
925 StatusCode::INTERNAL_SERVER_ERROR,
926 Json(json!({"error": "InternalError"})),
927 )
928 .into_response();
929 }
930 if let Some(code) = &input.invite_code
931 && !code.trim().is_empty()
932 {
933 let use_insert = sqlx::query!(
934 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
935 code,
936 user_id
937 )
938 .execute(&mut *tx)
939 .await;
940 if let Err(e) = use_insert {
941 error!("Error recording invite usage: {:?}", e);
942 return (
943 StatusCode::INTERNAL_SERVER_ERROR,
944 Json(json!({"error": "InternalError"})),
945 )
946 .into_response();
947 }
948 }
949 if let Err(e) = tx.commit().await {
950 error!("Error committing transaction: {:?}", e);
951 return (
952 StatusCode::INTERNAL_SERVER_ERROR,
953 Json(json!({"error": "InternalError"})),
954 )
955 .into_response();
956 }
957 if !is_migration && !is_did_web_byod {
958 if let Err(e) =
959 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
960 {
961 warn!("Failed to sequence identity event for {}: {}", did, e);
962 }
963 if let Err(e) =
964 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
965 {
966 warn!("Failed to sequence account event for {}: {}", did, e);
967 }
968 let profile_record = json!({
969 "$type": "app.bsky.actor.profile",
970 "displayName": input.handle
971 });
972 if let Err(e) = crate::api::repo::record::create_record_internal(
973 &state,
974 &did,
975 "app.bsky.actor.profile",
976 "self",
977 &profile_record,
978 )
979 .await
980 {
981 warn!("Failed to create default profile for {}: {}", did, e);
982 }
983 }
984 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
985 if !is_migration {
986 if let Some(ref recipient) = verification_recipient {
987 let verification_token = crate::auth::verification_token::generate_signup_token(
988 &did,
989 verification_channel,
990 recipient,
991 );
992 let formatted_token =
993 crate::auth::verification_token::format_token_for_display(&verification_token);
994 if let Err(e) = crate::comms::enqueue_signup_verification(
995 &state.db,
996 user_id,
997 verification_channel,
998 recipient,
999 &formatted_token,
1000 None,
1001 )
1002 .await
1003 {
1004 warn!(
1005 "Failed to enqueue signup verification notification: {:?}",
1006 e
1007 );
1008 }
1009 }
1010 } else if let Some(ref user_email) = email {
1011 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
1012 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
1013 if let Err(e) = crate::comms::enqueue_migration_verification(
1014 &state.db,
1015 user_id,
1016 user_email,
1017 &formatted_token,
1018 &hostname,
1019 )
1020 .await
1021 {
1022 warn!("Failed to enqueue migration verification email: {:?}", e);
1023 }
1024 }
1025
1026 let (access_jwt, refresh_jwt) = if is_migration {
1027 info!(
1028 "[MIGRATION] createAccount: Creating session tokens for migration did={}",
1029 did
1030 );
1031 let access_meta = match crate::auth::create_access_token_with_metadata(
1032 &did,
1033 &secret_key_bytes,
1034 ) {
1035 Ok(m) => m,
1036 Err(e) => {
1037 error!(
1038 "[MIGRATION] createAccount: Error creating access token for migration: {:?}",
1039 e
1040 );
1041 return (
1042 StatusCode::INTERNAL_SERVER_ERROR,
1043 Json(json!({"error": "InternalError"})),
1044 )
1045 .into_response();
1046 }
1047 };
1048 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
1049 &did,
1050 &secret_key_bytes,
1051 ) {
1052 Ok(m) => m,
1053 Err(e) => {
1054 error!(
1055 "[MIGRATION] createAccount: Error creating refresh token for migration: {:?}",
1056 e
1057 );
1058 return (
1059 StatusCode::INTERNAL_SERVER_ERROR,
1060 Json(json!({"error": "InternalError"})),
1061 )
1062 .into_response();
1063 }
1064 };
1065 if let Err(e) = sqlx::query!(
1066 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1067 did,
1068 access_meta.jti,
1069 refresh_meta.jti,
1070 access_meta.expires_at,
1071 refresh_meta.expires_at
1072 )
1073 .execute(&state.db)
1074 .await
1075 {
1076 error!("[MIGRATION] createAccount: Error creating session for migration: {:?}", e);
1077 return (
1078 StatusCode::INTERNAL_SERVER_ERROR,
1079 Json(json!({"error": "InternalError"})),
1080 )
1081 .into_response();
1082 }
1083 info!(
1084 "[MIGRATION] createAccount: Session created successfully for did={}",
1085 did
1086 );
1087 (Some(access_meta.token), Some(refresh_meta.token))
1088 } else {
1089 (None, None)
1090 };
1091
1092 if is_migration {
1093 info!(
1094 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}",
1095 did, handle
1096 );
1097 }
1098
1099 (
1100 StatusCode::OK,
1101 Json(CreateAccountOutput {
1102 handle: handle.clone(),
1103 did,
1104 access_jwt,
1105 refresh_jwt,
1106 verification_required: !is_migration,
1107 verification_channel: verification_channel.to_string(),
1108 }),
1109 )
1110 .into_response()
1111}