this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub did_doc: Option<serde_json::Value>,
61 pub access_jwt: String,
62 pub refresh_jwt: String,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 let is_potential_migration = input
73 .did
74 .as_ref()
75 .map(|d| d.starts_with("did:plc:"))
76 .unwrap_or(false);
77 if is_potential_migration {
78 info!(
79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}",
80 input.did, input.handle
81 );
82 } else {
83 info!("create_account called");
84 }
85 let client_ip = extract_client_ip(&headers);
86 if !state
87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
88 .await
89 {
90 warn!(ip = %client_ip, "Account creation rate limit exceeded");
91 return (
92 StatusCode::TOO_MANY_REQUESTS,
93 Json(json!({
94 "error": "RateLimitExceeded",
95 "message": "Too many account creation attempts. Please try again later."
96 })),
97 )
98 .into_response();
99 }
100
101 let migration_auth = if let Some(token) =
102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
103 {
104 if is_service_token(&token) {
105 let verifier = ServiceTokenVerifier::new();
106 match verifier
107 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
108 .await
109 {
110 Ok(claims) => {
111 debug!("Service token verified for migration: iss={}", claims.iss);
112 Some(claims.iss)
113 }
114 Err(e) => {
115 error!("Service token verification failed: {:?}", e);
116 return (
117 StatusCode::UNAUTHORIZED,
118 Json(json!({
119 "error": "AuthenticationFailed",
120 "message": format!("Service token verification failed: {}", e)
121 })),
122 )
123 .into_response();
124 }
125 }
126 } else {
127 None
128 }
129 } else {
130 None
131 };
132
133 let is_did_web_byod = migration_auth.is_some()
134 && input
135 .did
136 .as_ref()
137 .map(|d| d.starts_with("did:web:"))
138 .unwrap_or(false);
139
140 let is_migration = migration_auth.is_some()
141 && input
142 .did
143 .as_ref()
144 .map(|d| d.starts_with("did:plc:"))
145 .unwrap_or(false);
146
147 if (is_migration || is_did_web_byod)
148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
149 {
150 if provided_did != auth_did {
151 info!(
152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}",
153 auth_did, provided_did
154 );
155 return (
156 StatusCode::FORBIDDEN,
157 Json(json!({
158 "error": "AuthorizationError",
159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did)
160 })),
161 )
162 .into_response();
163 }
164 if is_did_web_byod {
165 info!(did = %provided_did, "Processing did:web BYOD account creation");
166 } else {
167 info!(
168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}",
169 provided_did
170 );
171 }
172 }
173
174 let hostname_for_validation =
175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
176 let pds_suffix = format!(".{}", hostname_for_validation);
177
178 let validated_short_handle = if !input.handle.contains('.')
179 || input.handle.ends_with(&pds_suffix)
180 {
181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
182 input
183 .handle
184 .strip_suffix(&pds_suffix)
185 .unwrap_or(&input.handle)
186 } else {
187 &input.handle
188 };
189 match crate::api::validation::validate_short_handle(handle_to_validate) {
190 Ok(h) => h,
191 Err(crate::api::validation::HandleValidationError::Reserved) => {
192 return (
193 StatusCode::BAD_REQUEST,
194 Json(json!({"error": "HandleNotAvailable", "message": "Reserved handle"})),
195 )
196 .into_response();
197 }
198 Err(e) => {
199 return (
200 StatusCode::BAD_REQUEST,
201 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
202 )
203 .into_response();
204 }
205 }
206 } else {
207 if input.handle.contains(' ') || input.handle.contains('\t') {
208 return (
209 StatusCode::BAD_REQUEST,
210 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
211 )
212 .into_response();
213 }
214 for c in input.handle.chars() {
215 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
216 return (
217 StatusCode::BAD_REQUEST,
218 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
219 )
220 .into_response();
221 }
222 }
223 let handle_lower = input.handle.to_lowercase();
224 if crate::moderation::has_explicit_slur(&handle_lower) {
225 return (
226 StatusCode::BAD_REQUEST,
227 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})),
228 )
229 .into_response();
230 }
231 handle_lower
232 };
233 let email: Option<String> = input
234 .email
235 .as_ref()
236 .map(|e| e.trim().to_string())
237 .filter(|e| !e.is_empty());
238 if let Some(ref email) = email
239 && !crate::api::validation::is_valid_email(email)
240 {
241 return (
242 StatusCode::BAD_REQUEST,
243 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
244 )
245 .into_response();
246 }
247 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
248 let valid_channels = ["email", "discord", "telegram", "signal"];
249 if !valid_channels.contains(&verification_channel) && !is_migration {
250 return (
251 StatusCode::BAD_REQUEST,
252 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
253 )
254 .into_response();
255 }
256 let verification_recipient = if is_migration {
257 None
258 } else {
259 Some(match verification_channel {
260 "email" => match &input.email {
261 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
262 _ => return (
263 StatusCode::BAD_REQUEST,
264 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
265 ).into_response(),
266 },
267 "discord" => match &input.discord_id {
268 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
269 _ => return (
270 StatusCode::BAD_REQUEST,
271 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
272 ).into_response(),
273 },
274 "telegram" => match &input.telegram_username {
275 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
276 _ => return (
277 StatusCode::BAD_REQUEST,
278 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
279 ).into_response(),
280 },
281 "signal" => match &input.signal_number {
282 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
283 _ => return (
284 StatusCode::BAD_REQUEST,
285 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
286 ).into_response(),
287 },
288 _ => return (
289 StatusCode::BAD_REQUEST,
290 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
291 ).into_response(),
292 })
293 };
294 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
295 let pds_endpoint = format!("https://{}", hostname);
296 let suffix = format!(".{}", hostname);
297 let handle = if input.handle.ends_with(&suffix) {
298 format!("{}.{}", validated_short_handle, hostname)
299 } else if input.handle.contains('.') {
300 validated_short_handle.clone()
301 } else {
302 format!("{}.{}", validated_short_handle, hostname)
303 };
304 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
305 if let Some(signing_key_did) = &input.signing_key {
306 let reserved = sqlx::query!(
307 r#"
308 SELECT id, private_key_bytes
309 FROM reserved_signing_keys
310 WHERE public_key_did_key = $1
311 AND used_at IS NULL
312 AND expires_at > NOW()
313 FOR UPDATE
314 "#,
315 signing_key_did
316 )
317 .fetch_optional(&state.db)
318 .await;
319 match reserved {
320 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
321 Ok(None) => {
322 return (
323 StatusCode::BAD_REQUEST,
324 Json(json!({
325 "error": "InvalidSigningKey",
326 "message": "Signing key not found, already used, or expired"
327 })),
328 )
329 .into_response();
330 }
331 Err(e) => {
332 error!("Error looking up reserved signing key: {:?}", e);
333 return (
334 StatusCode::INTERNAL_SERVER_ERROR,
335 Json(json!({"error": "InternalError"})),
336 )
337 .into_response();
338 }
339 }
340 } else {
341 let secret_key = SecretKey::random(&mut OsRng);
342 (secret_key.to_bytes().to_vec(), None)
343 };
344 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
345 Ok(k) => k,
346 Err(e) => {
347 error!("Error creating signing key: {:?}", e);
348 return (
349 StatusCode::INTERNAL_SERVER_ERROR,
350 Json(json!({"error": "InternalError"})),
351 )
352 .into_response();
353 }
354 };
355 let did_type = input.did_type.as_deref().unwrap_or("plc");
356 let did = match did_type {
357 "web" => {
358 let subdomain_host = format!("{}.{}", input.handle, hostname);
359 let encoded_subdomain = subdomain_host.replace(':', "%3A");
360 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
361 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
362 self_hosted_did
363 }
364 "web-external" => {
365 let d = match &input.did {
366 Some(d) if !d.trim().is_empty() => d,
367 _ => {
368 return (
369 StatusCode::BAD_REQUEST,
370 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
371 )
372 .into_response();
373 }
374 };
375 if !d.starts_with("did:web:") {
376 return (
377 StatusCode::BAD_REQUEST,
378 Json(
379 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
380 ),
381 )
382 .into_response();
383 }
384 if !is_did_web_byod
385 && let Err(e) =
386 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
387 {
388 return (
389 StatusCode::BAD_REQUEST,
390 Json(json!({"error": "InvalidDid", "message": e})),
391 )
392 .into_response();
393 }
394 info!(did = %d, "Creating external did:web account");
395 d.clone()
396 }
397 _ => {
398 if let Some(d) = &input.did {
399 if d.starts_with("did:plc:") && is_migration {
400 info!(did = %d, "Migration with existing did:plc");
401 d.clone()
402 } else if d.starts_with("did:web:") {
403 if !is_did_web_byod
404 && let Err(e) = verify_did_web(
405 d,
406 &hostname,
407 &input.handle,
408 input.signing_key.as_deref(),
409 )
410 .await
411 {
412 return (
413 StatusCode::BAD_REQUEST,
414 Json(json!({"error": "InvalidDid", "message": e})),
415 )
416 .into_response();
417 }
418 d.clone()
419 } else if !d.trim().is_empty() {
420 return (
421 StatusCode::BAD_REQUEST,
422 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
423 )
424 .into_response();
425 } else {
426 let rotation_key = std::env::var("PLC_ROTATION_KEY")
427 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
428 let genesis_result = match create_genesis_operation(
429 &signing_key,
430 &rotation_key,
431 &handle,
432 &pds_endpoint,
433 ) {
434 Ok(r) => r,
435 Err(e) => {
436 error!("Error creating PLC genesis operation: {:?}", e);
437 return (
438 StatusCode::INTERNAL_SERVER_ERROR,
439 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
440 )
441 .into_response();
442 }
443 };
444 let plc_client = PlcClient::new(None);
445 if let Err(e) = plc_client
446 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
447 .await
448 {
449 error!("Failed to submit PLC genesis operation: {:?}", e);
450 return (
451 StatusCode::BAD_GATEWAY,
452 Json(json!({
453 "error": "UpstreamError",
454 "message": format!("Failed to register DID with PLC directory: {}", e)
455 })),
456 )
457 .into_response();
458 }
459 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
460 genesis_result.did
461 }
462 } else {
463 let rotation_key = std::env::var("PLC_ROTATION_KEY")
464 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
465 let genesis_result = match create_genesis_operation(
466 &signing_key,
467 &rotation_key,
468 &handle,
469 &pds_endpoint,
470 ) {
471 Ok(r) => r,
472 Err(e) => {
473 error!("Error creating PLC genesis operation: {:?}", e);
474 return (
475 StatusCode::INTERNAL_SERVER_ERROR,
476 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
477 )
478 .into_response();
479 }
480 };
481 let plc_client = PlcClient::new(None);
482 if let Err(e) = plc_client
483 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
484 .await
485 {
486 error!("Failed to submit PLC genesis operation: {:?}", e);
487 return (
488 StatusCode::BAD_GATEWAY,
489 Json(json!({
490 "error": "UpstreamError",
491 "message": format!("Failed to register DID with PLC directory: {}", e)
492 })),
493 )
494 .into_response();
495 }
496 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
497 genesis_result.did
498 }
499 }
500 };
501 let mut tx = match state.db.begin().await {
502 Ok(tx) => tx,
503 Err(e) => {
504 error!("Error starting transaction: {:?}", e);
505 return (
506 StatusCode::INTERNAL_SERVER_ERROR,
507 Json(json!({"error": "InternalError"})),
508 )
509 .into_response();
510 }
511 };
512 if is_migration {
513 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
514 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
515 .bind(&did)
516 .fetch_optional(&mut *tx)
517 .await
518 .unwrap_or(None);
519 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
520 if deactivated_at.is_some() {
521 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
522 let update_result: Result<_, sqlx::Error> =
523 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
524 .bind(&handle)
525 .bind(account_id)
526 .execute(&mut *tx)
527 .await;
528 if let Err(e) = update_result {
529 if let Some(db_err) = e.as_database_error()
530 && db_err
531 .constraint()
532 .map(|c| c.contains("handle"))
533 .unwrap_or(false)
534 {
535 return (
536 StatusCode::BAD_REQUEST,
537 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
538 )
539 .into_response();
540 }
541 error!("Error reactivating account: {:?}", e);
542 return (
543 StatusCode::INTERNAL_SERVER_ERROR,
544 Json(json!({"error": "InternalError"})),
545 )
546 .into_response();
547 }
548 if let Err(e) = tx.commit().await {
549 error!("Error committing reactivation: {:?}", e);
550 return (
551 StatusCode::INTERNAL_SERVER_ERROR,
552 Json(json!({"error": "InternalError"})),
553 )
554 .into_response();
555 }
556 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
557 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
558 )
559 .bind(account_id)
560 .fetch_optional(&state.db)
561 .await
562 .unwrap_or(None);
563 let secret_key_bytes = match key_row {
564 Some((key_bytes, encryption_version)) => {
565 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
566 Ok(k) => k,
567 Err(e) => {
568 error!("Error decrypting key for reactivated account: {:?}", e);
569 return (
570 StatusCode::INTERNAL_SERVER_ERROR,
571 Json(json!({"error": "InternalError"})),
572 )
573 .into_response();
574 }
575 }
576 }
577 None => {
578 error!("No signing key found for reactivated account");
579 return (
580 StatusCode::INTERNAL_SERVER_ERROR,
581 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
582 )
583 .into_response();
584 }
585 };
586 let access_meta =
587 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
588 Ok(m) => m,
589 Err(e) => {
590 error!("Error creating access token: {:?}", e);
591 return (
592 StatusCode::INTERNAL_SERVER_ERROR,
593 Json(json!({"error": "InternalError"})),
594 )
595 .into_response();
596 }
597 };
598 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
599 &did,
600 &secret_key_bytes,
601 ) {
602 Ok(m) => m,
603 Err(e) => {
604 error!("Error creating refresh token: {:?}", e);
605 return (
606 StatusCode::INTERNAL_SERVER_ERROR,
607 Json(json!({"error": "InternalError"})),
608 )
609 .into_response();
610 }
611 };
612 let session_result: Result<_, sqlx::Error> = sqlx::query(
613 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
614 )
615 .bind(&did)
616 .bind(&access_meta.jti)
617 .bind(&refresh_meta.jti)
618 .bind(access_meta.expires_at)
619 .bind(refresh_meta.expires_at)
620 .execute(&state.db)
621 .await;
622 if let Err(e) = session_result {
623 error!("Error creating session: {:?}", e);
624 return (
625 StatusCode::INTERNAL_SERVER_ERROR,
626 Json(json!({"error": "InternalError"})),
627 )
628 .into_response();
629 }
630 return (
631 StatusCode::OK,
632 Json(CreateAccountOutput {
633 handle: handle.clone(),
634 did: did.clone(),
635 did_doc: state.did_resolver.resolve_did_document(&did).await,
636 access_jwt: access_meta.token,
637 refresh_jwt: refresh_meta.token,
638 verification_required: false,
639 verification_channel: "email".to_string(),
640 }),
641 )
642 .into_response();
643 } else {
644 return (
645 StatusCode::BAD_REQUEST,
646 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
647 )
648 .into_response();
649 }
650 }
651 }
652 let exists_result: Option<(i32,)> =
653 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
654 .bind(&handle)
655 .fetch_optional(&mut *tx)
656 .await
657 .unwrap_or(None);
658 if exists_result.is_some() {
659 return (
660 StatusCode::BAD_REQUEST,
661 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
662 )
663 .into_response();
664 }
665 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
666 .map(|v| v == "true" || v == "1")
667 .unwrap_or(false);
668 if invite_code_required
669 && input
670 .invite_code
671 .as_ref()
672 .map(|c| c.trim().is_empty())
673 .unwrap_or(true)
674 {
675 return (
676 StatusCode::BAD_REQUEST,
677 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
678 )
679 .into_response();
680 }
681 if let Some(code) = &input.invite_code
682 && !code.trim().is_empty()
683 {
684 let invite_query = sqlx::query!(
685 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
686 code
687 )
688 .fetch_optional(&mut *tx)
689 .await;
690 match invite_query {
691 Ok(Some(row)) => {
692 if row.available_uses <= 0 {
693 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
694 }
695 let update_invite = sqlx::query!(
696 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
697 code
698 )
699 .execute(&mut *tx)
700 .await;
701 if let Err(e) = update_invite {
702 error!("Error updating invite code: {:?}", e);
703 return (
704 StatusCode::INTERNAL_SERVER_ERROR,
705 Json(json!({"error": "InternalError"})),
706 )
707 .into_response();
708 }
709 }
710 Ok(None) => {
711 return (
712 StatusCode::BAD_REQUEST,
713 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
714 )
715 .into_response();
716 }
717 Err(e) => {
718 error!("Error checking invite code: {:?}", e);
719 return (
720 StatusCode::INTERNAL_SERVER_ERROR,
721 Json(json!({"error": "InternalError"})),
722 )
723 .into_response();
724 }
725 }
726 }
727 if let Err(e) = validate_password(&input.password) {
728 return (
729 StatusCode::BAD_REQUEST,
730 Json(json!({
731 "error": "InvalidPassword",
732 "message": e.to_string()
733 })),
734 )
735 .into_response();
736 }
737
738 let password_hash = match hash(&input.password, DEFAULT_COST) {
739 Ok(h) => h,
740 Err(e) => {
741 error!("Error hashing password: {:?}", e);
742 return (
743 StatusCode::INTERNAL_SERVER_ERROR,
744 Json(json!({"error": "InternalError"})),
745 )
746 .into_response();
747 }
748 };
749 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
750 .fetch_one(&mut *tx)
751 .await
752 .map(|c| c.unwrap_or(0) == 0)
753 .unwrap_or(false);
754 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
755 Some(chrono::Utc::now())
756 } else {
757 None
758 };
759 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
760 r#"INSERT INTO users (
761 handle, email, did, password_hash,
762 preferred_comms_channel,
763 discord_id, telegram_username, signal_number,
764 is_admin, deactivated_at, email_verified
765 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
766 )
767 .bind(&handle)
768 .bind(&email)
769 .bind(&did)
770 .bind(&password_hash)
771 .bind(verification_channel)
772 .bind(
773 input
774 .discord_id
775 .as_deref()
776 .map(|s| s.trim())
777 .filter(|s| !s.is_empty()),
778 )
779 .bind(
780 input
781 .telegram_username
782 .as_deref()
783 .map(|s| s.trim())
784 .filter(|s| !s.is_empty()),
785 )
786 .bind(
787 input
788 .signal_number
789 .as_deref()
790 .map(|s| s.trim())
791 .filter(|s| !s.is_empty()),
792 )
793 .bind(is_first_user)
794 .bind(deactivated_at)
795 .bind(false)
796 .fetch_one(&mut *tx)
797 .await;
798 let user_id = match user_insert {
799 Ok((id,)) => id,
800 Err(e) => {
801 if let Some(db_err) = e.as_database_error()
802 && db_err.code().as_deref() == Some("23505")
803 {
804 let constraint = db_err.constraint().unwrap_or("");
805 if constraint.contains("handle") || constraint.contains("users_handle") {
806 return (
807 StatusCode::BAD_REQUEST,
808 Json(json!({
809 "error": "HandleNotAvailable",
810 "message": "Handle already taken"
811 })),
812 )
813 .into_response();
814 } else if constraint.contains("email") || constraint.contains("users_email") {
815 return (
816 StatusCode::BAD_REQUEST,
817 Json(json!({
818 "error": "InvalidEmail",
819 "message": "Email already registered"
820 })),
821 )
822 .into_response();
823 } else if constraint.contains("did") || constraint.contains("users_did") {
824 return (
825 StatusCode::BAD_REQUEST,
826 Json(json!({
827 "error": "AccountAlreadyExists",
828 "message": "An account with this DID already exists"
829 })),
830 )
831 .into_response();
832 }
833 }
834 error!("Error inserting user: {:?}", e);
835 return (
836 StatusCode::INTERNAL_SERVER_ERROR,
837 Json(json!({"error": "InternalError"})),
838 )
839 .into_response();
840 }
841 };
842
843 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
844 Ok(enc) => enc,
845 Err(e) => {
846 error!("Error encrypting user key: {:?}", e);
847 return (
848 StatusCode::INTERNAL_SERVER_ERROR,
849 Json(json!({"error": "InternalError"})),
850 )
851 .into_response();
852 }
853 };
854 let key_insert = sqlx::query!(
855 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
856 user_id,
857 &encrypted_key_bytes[..],
858 crate::config::ENCRYPTION_VERSION
859 )
860 .execute(&mut *tx)
861 .await;
862 if let Err(e) = key_insert {
863 error!("Error inserting user key: {:?}", e);
864 return (
865 StatusCode::INTERNAL_SERVER_ERROR,
866 Json(json!({"error": "InternalError"})),
867 )
868 .into_response();
869 }
870 if let Some(key_id) = reserved_key_id {
871 let mark_used = sqlx::query!(
872 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
873 key_id
874 )
875 .execute(&mut *tx)
876 .await;
877 if let Err(e) = mark_used {
878 error!("Error marking reserved key as used: {:?}", e);
879 return (
880 StatusCode::INTERNAL_SERVER_ERROR,
881 Json(json!({"error": "InternalError"})),
882 )
883 .into_response();
884 }
885 }
886 let mst = Mst::new(Arc::new(state.block_store.clone()));
887 let mst_root = match mst.persist().await {
888 Ok(c) => c,
889 Err(e) => {
890 error!("Error persisting MST: {:?}", e);
891 return (
892 StatusCode::INTERNAL_SERVER_ERROR,
893 Json(json!({"error": "InternalError"})),
894 )
895 .into_response();
896 }
897 };
898 let rev = Tid::now(LimitedU32::MIN);
899 let (commit_bytes, _sig) =
900 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
901 Ok(result) => result,
902 Err(e) => {
903 error!("Error creating genesis commit: {:?}", e);
904 return (
905 StatusCode::INTERNAL_SERVER_ERROR,
906 Json(json!({"error": "InternalError"})),
907 )
908 .into_response();
909 }
910 };
911 let commit_cid = match state.block_store.put(&commit_bytes).await {
912 Ok(c) => c,
913 Err(e) => {
914 error!("Error saving genesis commit: {:?}", e);
915 return (
916 StatusCode::INTERNAL_SERVER_ERROR,
917 Json(json!({"error": "InternalError"})),
918 )
919 .into_response();
920 }
921 };
922 let commit_cid_str = commit_cid.to_string();
923 let rev_str = rev.as_ref().to_string();
924 let repo_insert = sqlx::query!(
925 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
926 user_id,
927 commit_cid_str,
928 rev_str
929 )
930 .execute(&mut *tx)
931 .await;
932 if let Err(e) = repo_insert {
933 error!("Error initializing repo: {:?}", e);
934 return (
935 StatusCode::INTERNAL_SERVER_ERROR,
936 Json(json!({"error": "InternalError"})),
937 )
938 .into_response();
939 }
940 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
941 if let Err(e) = sqlx::query!(
942 r#"
943 INSERT INTO user_blocks (user_id, block_cid)
944 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
945 ON CONFLICT (user_id, block_cid) DO NOTHING
946 "#,
947 user_id,
948 &genesis_block_cids
949 )
950 .execute(&mut *tx)
951 .await
952 {
953 error!("Error inserting user_blocks: {:?}", e);
954 return (
955 StatusCode::INTERNAL_SERVER_ERROR,
956 Json(json!({"error": "InternalError"})),
957 )
958 .into_response();
959 }
960 if let Some(code) = &input.invite_code
961 && !code.trim().is_empty()
962 {
963 let use_insert = sqlx::query!(
964 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
965 code,
966 user_id
967 )
968 .execute(&mut *tx)
969 .await;
970 if let Err(e) = use_insert {
971 error!("Error recording invite usage: {:?}", e);
972 return (
973 StatusCode::INTERNAL_SERVER_ERROR,
974 Json(json!({"error": "InternalError"})),
975 )
976 .into_response();
977 }
978 }
979 if let Err(e) = tx.commit().await {
980 error!("Error committing transaction: {:?}", e);
981 return (
982 StatusCode::INTERNAL_SERVER_ERROR,
983 Json(json!({"error": "InternalError"})),
984 )
985 .into_response();
986 }
987 if !is_migration && !is_did_web_byod {
988 if let Err(e) =
989 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
990 {
991 warn!("Failed to sequence identity event for {}: {}", did, e);
992 }
993 if let Err(e) =
994 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
995 {
996 warn!("Failed to sequence account event for {}: {}", did, e);
997 }
998 if let Err(e) =
999 crate::api::repo::record::sequence_genesis_commit(&state, &did, &commit_cid, &mst_root, &rev_str).await
1000 {
1001 warn!("Failed to sequence commit event for {}: {}", did, e);
1002 }
1003 if let Err(e) = crate::api::repo::record::sequence_sync_event(
1004 &state,
1005 &did,
1006 &commit_cid_str,
1007 Some(rev.as_ref()),
1008 )
1009 .await
1010 {
1011 warn!("Failed to sequence sync event for {}: {}", did, e);
1012 }
1013 let profile_record = json!({
1014 "$type": "app.bsky.actor.profile",
1015 "displayName": input.handle
1016 });
1017 if let Err(e) = crate::api::repo::record::create_record_internal(
1018 &state,
1019 &did,
1020 "app.bsky.actor.profile",
1021 "self",
1022 &profile_record,
1023 )
1024 .await
1025 {
1026 warn!("Failed to create default profile for {}: {}", did, e);
1027 }
1028 }
1029 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
1030 if !is_migration {
1031 if let Some(ref recipient) = verification_recipient {
1032 let verification_token = crate::auth::verification_token::generate_signup_token(
1033 &did,
1034 verification_channel,
1035 recipient,
1036 );
1037 let formatted_token =
1038 crate::auth::verification_token::format_token_for_display(&verification_token);
1039 if let Err(e) = crate::comms::enqueue_signup_verification(
1040 &state.db,
1041 user_id,
1042 verification_channel,
1043 recipient,
1044 &formatted_token,
1045 None,
1046 )
1047 .await
1048 {
1049 warn!(
1050 "Failed to enqueue signup verification notification: {:?}",
1051 e
1052 );
1053 }
1054 }
1055 } else if let Some(ref user_email) = email {
1056 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
1057 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
1058 if let Err(e) = crate::comms::enqueue_migration_verification(
1059 &state.db,
1060 user_id,
1061 user_email,
1062 &formatted_token,
1063 &hostname,
1064 )
1065 .await
1066 {
1067 warn!("Failed to enqueue migration verification email: {:?}", e);
1068 }
1069 }
1070
1071 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes)
1072 {
1073 Ok(m) => m,
1074 Err(e) => {
1075 error!("createAccount: Error creating access token: {:?}", e);
1076 return (
1077 StatusCode::INTERNAL_SERVER_ERROR,
1078 Json(json!({"error": "InternalError"})),
1079 )
1080 .into_response();
1081 }
1082 };
1083 let refresh_meta =
1084 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1085 Ok(m) => m,
1086 Err(e) => {
1087 error!("createAccount: Error creating refresh token: {:?}", e);
1088 return (
1089 StatusCode::INTERNAL_SERVER_ERROR,
1090 Json(json!({"error": "InternalError"})),
1091 )
1092 .into_response();
1093 }
1094 };
1095 if let Err(e) = sqlx::query!(
1096 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1097 did,
1098 access_meta.jti,
1099 refresh_meta.jti,
1100 access_meta.expires_at,
1101 refresh_meta.expires_at
1102 )
1103 .execute(&state.db)
1104 .await
1105 {
1106 error!("createAccount: Error creating session: {:?}", e);
1107 return (
1108 StatusCode::INTERNAL_SERVER_ERROR,
1109 Json(json!({"error": "InternalError"})),
1110 )
1111 .into_response();
1112 }
1113
1114 let did_doc = state.did_resolver.resolve_did_document(&did).await;
1115
1116 if is_migration {
1117 info!(
1118 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}",
1119 did, handle
1120 );
1121 }
1122
1123 (
1124 StatusCode::OK,
1125 Json(CreateAccountOutput {
1126 handle: handle.clone(),
1127 did,
1128 did_doc,
1129 access_jwt: access_meta.token,
1130 refresh_jwt: refresh_meta.token,
1131 verification_required: !is_migration,
1132 verification_channel: verification_channel.to_string(),
1133 }),
1134 )
1135 .into_response()
1136}
1137