this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub did_doc: Option<serde_json::Value>,
61 pub access_jwt: String,
62 pub refresh_jwt: String,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 let is_potential_migration = input
73 .did
74 .as_ref()
75 .map(|d| d.starts_with("did:plc:"))
76 .unwrap_or(false);
77 if is_potential_migration {
78 info!(
79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}",
80 input.did, input.handle
81 );
82 } else {
83 info!("create_account called");
84 }
85 let client_ip = extract_client_ip(&headers);
86 if !state
87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
88 .await
89 {
90 warn!(ip = %client_ip, "Account creation rate limit exceeded");
91 return (
92 StatusCode::TOO_MANY_REQUESTS,
93 Json(json!({
94 "error": "RateLimitExceeded",
95 "message": "Too many account creation attempts. Please try again later."
96 })),
97 )
98 .into_response();
99 }
100
101 let migration_auth = if let Some(token) =
102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
103 {
104 if is_service_token(&token) {
105 let verifier = ServiceTokenVerifier::new();
106 match verifier
107 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
108 .await
109 {
110 Ok(claims) => {
111 debug!("Service token verified for migration: iss={}", claims.iss);
112 Some(claims.iss)
113 }
114 Err(e) => {
115 error!("Service token verification failed: {:?}", e);
116 return (
117 StatusCode::UNAUTHORIZED,
118 Json(json!({
119 "error": "AuthenticationFailed",
120 "message": format!("Service token verification failed: {}", e)
121 })),
122 )
123 .into_response();
124 }
125 }
126 } else {
127 None
128 }
129 } else {
130 None
131 };
132
133 let is_did_web_byod = migration_auth.is_some()
134 && input
135 .did
136 .as_ref()
137 .map(|d| d.starts_with("did:web:"))
138 .unwrap_or(false);
139
140 let is_migration = migration_auth.is_some()
141 && input
142 .did
143 .as_ref()
144 .map(|d| d.starts_with("did:plc:"))
145 .unwrap_or(false);
146
147 if (is_migration || is_did_web_byod)
148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
149 {
150 if provided_did != auth_did {
151 info!(
152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}",
153 auth_did, provided_did
154 );
155 return (
156 StatusCode::FORBIDDEN,
157 Json(json!({
158 "error": "AuthorizationError",
159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did)
160 })),
161 )
162 .into_response();
163 }
164 if is_did_web_byod {
165 info!(did = %provided_did, "Processing did:web BYOD account creation");
166 } else {
167 info!(
168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}",
169 provided_did
170 );
171 }
172 }
173
174 let hostname_for_validation =
175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
176 let pds_suffix = format!(".{}", hostname_for_validation);
177
178 let validated_short_handle = if !input.handle.contains('.')
179 || input.handle.ends_with(&pds_suffix)
180 {
181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
182 input
183 .handle
184 .strip_suffix(&pds_suffix)
185 .unwrap_or(&input.handle)
186 } else {
187 &input.handle
188 };
189 match crate::api::validation::validate_short_handle(handle_to_validate) {
190 Ok(h) => h,
191 Err(crate::api::validation::HandleValidationError::Reserved) => {
192 return (
193 StatusCode::BAD_REQUEST,
194 Json(json!({"error": "HandleNotAvailable", "message": "Reserved handle"})),
195 )
196 .into_response();
197 }
198 Err(e) => {
199 return (
200 StatusCode::BAD_REQUEST,
201 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
202 )
203 .into_response();
204 }
205 }
206 } else {
207 if input.handle.contains(' ') || input.handle.contains('\t') {
208 return (
209 StatusCode::BAD_REQUEST,
210 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
211 )
212 .into_response();
213 }
214 for c in input.handle.chars() {
215 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
216 return (
217 StatusCode::BAD_REQUEST,
218 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
219 )
220 .into_response();
221 }
222 }
223 let handle_lower = input.handle.to_lowercase();
224 if crate::moderation::has_explicit_slur(&handle_lower) {
225 return (
226 StatusCode::BAD_REQUEST,
227 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})),
228 )
229 .into_response();
230 }
231 handle_lower
232 };
233 let email: Option<String> = input
234 .email
235 .as_ref()
236 .map(|e| e.trim().to_string())
237 .filter(|e| !e.is_empty());
238 if let Some(ref email) = email
239 && !crate::api::validation::is_valid_email(email)
240 {
241 return (
242 StatusCode::BAD_REQUEST,
243 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
244 )
245 .into_response();
246 }
247 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
248 let valid_channels = ["email", "discord", "telegram", "signal"];
249 if !valid_channels.contains(&verification_channel) && !is_migration {
250 return (
251 StatusCode::BAD_REQUEST,
252 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
253 )
254 .into_response();
255 }
256 let verification_recipient = if is_migration {
257 None
258 } else {
259 Some(match verification_channel {
260 "email" => match &input.email {
261 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
262 _ => return (
263 StatusCode::BAD_REQUEST,
264 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
265 ).into_response(),
266 },
267 "discord" => match &input.discord_id {
268 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
269 _ => return (
270 StatusCode::BAD_REQUEST,
271 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
272 ).into_response(),
273 },
274 "telegram" => match &input.telegram_username {
275 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
276 _ => return (
277 StatusCode::BAD_REQUEST,
278 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
279 ).into_response(),
280 },
281 "signal" => match &input.signal_number {
282 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
283 _ => return (
284 StatusCode::BAD_REQUEST,
285 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
286 ).into_response(),
287 },
288 _ => return (
289 StatusCode::BAD_REQUEST,
290 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
291 ).into_response(),
292 })
293 };
294 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
295 let pds_endpoint = format!("https://{}", hostname);
296 let suffix = format!(".{}", hostname);
297 let handle = if input.handle.ends_with(&suffix) {
298 format!("{}.{}", validated_short_handle, hostname)
299 } else if input.handle.contains('.') {
300 validated_short_handle.clone()
301 } else {
302 format!("{}.{}", validated_short_handle, hostname)
303 };
304 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
305 if let Some(signing_key_did) = &input.signing_key {
306 let reserved = sqlx::query!(
307 r#"
308 SELECT id, private_key_bytes
309 FROM reserved_signing_keys
310 WHERE public_key_did_key = $1
311 AND used_at IS NULL
312 AND expires_at > NOW()
313 FOR UPDATE
314 "#,
315 signing_key_did
316 )
317 .fetch_optional(&state.db)
318 .await;
319 match reserved {
320 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
321 Ok(None) => {
322 return (
323 StatusCode::BAD_REQUEST,
324 Json(json!({
325 "error": "InvalidSigningKey",
326 "message": "Signing key not found, already used, or expired"
327 })),
328 )
329 .into_response();
330 }
331 Err(e) => {
332 error!("Error looking up reserved signing key: {:?}", e);
333 return (
334 StatusCode::INTERNAL_SERVER_ERROR,
335 Json(json!({"error": "InternalError"})),
336 )
337 .into_response();
338 }
339 }
340 } else {
341 let secret_key = SecretKey::random(&mut OsRng);
342 (secret_key.to_bytes().to_vec(), None)
343 };
344 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
345 Ok(k) => k,
346 Err(e) => {
347 error!("Error creating signing key: {:?}", e);
348 return (
349 StatusCode::INTERNAL_SERVER_ERROR,
350 Json(json!({"error": "InternalError"})),
351 )
352 .into_response();
353 }
354 };
355 let did_type = input.did_type.as_deref().unwrap_or("plc");
356 let did = match did_type {
357 "web" => {
358 if !crate::api::server::meta::is_self_hosted_did_web_enabled() {
359 return (
360 StatusCode::BAD_REQUEST,
361 Json(json!({
362 "error": "SelfHostedDidWebDisabled",
363 "message": "This PDS does not offer self-hosted did:web identities. Please use did:plc or bring your own did:web."
364 })),
365 )
366 .into_response();
367 }
368 let subdomain_host = format!("{}.{}", input.handle, hostname);
369 let encoded_subdomain = subdomain_host.replace(':', "%3A");
370 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
371 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
372 self_hosted_did
373 }
374 "web-external" => {
375 let d = match &input.did {
376 Some(d) if !d.trim().is_empty() => d,
377 _ => {
378 return (
379 StatusCode::BAD_REQUEST,
380 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
381 )
382 .into_response();
383 }
384 };
385 if !d.starts_with("did:web:") {
386 return (
387 StatusCode::BAD_REQUEST,
388 Json(
389 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
390 ),
391 )
392 .into_response();
393 }
394 if !is_did_web_byod
395 && let Err(e) =
396 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
397 {
398 return (
399 StatusCode::BAD_REQUEST,
400 Json(json!({"error": "InvalidDid", "message": e})),
401 )
402 .into_response();
403 }
404 info!(did = %d, "Creating external did:web account");
405 d.clone()
406 }
407 _ => {
408 if let Some(d) = &input.did {
409 if d.starts_with("did:plc:") && is_migration {
410 info!(did = %d, "Migration with existing did:plc");
411 d.clone()
412 } else if d.starts_with("did:web:") {
413 if !is_did_web_byod
414 && let Err(e) = verify_did_web(
415 d,
416 &hostname,
417 &input.handle,
418 input.signing_key.as_deref(),
419 )
420 .await
421 {
422 return (
423 StatusCode::BAD_REQUEST,
424 Json(json!({"error": "InvalidDid", "message": e})),
425 )
426 .into_response();
427 }
428 d.clone()
429 } else if !d.trim().is_empty() {
430 return (
431 StatusCode::BAD_REQUEST,
432 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
433 )
434 .into_response();
435 } else {
436 let rotation_key = std::env::var("PLC_ROTATION_KEY")
437 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
438 let genesis_result = match create_genesis_operation(
439 &signing_key,
440 &rotation_key,
441 &handle,
442 &pds_endpoint,
443 ) {
444 Ok(r) => r,
445 Err(e) => {
446 error!("Error creating PLC genesis operation: {:?}", e);
447 return (
448 StatusCode::INTERNAL_SERVER_ERROR,
449 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
450 )
451 .into_response();
452 }
453 };
454 let plc_client = PlcClient::new(None);
455 if let Err(e) = plc_client
456 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
457 .await
458 {
459 error!("Failed to submit PLC genesis operation: {:?}", e);
460 return (
461 StatusCode::BAD_GATEWAY,
462 Json(json!({
463 "error": "UpstreamError",
464 "message": format!("Failed to register DID with PLC directory: {}", e)
465 })),
466 )
467 .into_response();
468 }
469 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
470 genesis_result.did
471 }
472 } else {
473 let rotation_key = std::env::var("PLC_ROTATION_KEY")
474 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
475 let genesis_result = match create_genesis_operation(
476 &signing_key,
477 &rotation_key,
478 &handle,
479 &pds_endpoint,
480 ) {
481 Ok(r) => r,
482 Err(e) => {
483 error!("Error creating PLC genesis operation: {:?}", e);
484 return (
485 StatusCode::INTERNAL_SERVER_ERROR,
486 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
487 )
488 .into_response();
489 }
490 };
491 let plc_client = PlcClient::new(None);
492 if let Err(e) = plc_client
493 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
494 .await
495 {
496 error!("Failed to submit PLC genesis operation: {:?}", e);
497 return (
498 StatusCode::BAD_GATEWAY,
499 Json(json!({
500 "error": "UpstreamError",
501 "message": format!("Failed to register DID with PLC directory: {}", e)
502 })),
503 )
504 .into_response();
505 }
506 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
507 genesis_result.did
508 }
509 }
510 };
511 let mut tx = match state.db.begin().await {
512 Ok(tx) => tx,
513 Err(e) => {
514 error!("Error starting transaction: {:?}", e);
515 return (
516 StatusCode::INTERNAL_SERVER_ERROR,
517 Json(json!({"error": "InternalError"})),
518 )
519 .into_response();
520 }
521 };
522 if is_migration {
523 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
524 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
525 .bind(&did)
526 .fetch_optional(&mut *tx)
527 .await
528 .unwrap_or(None);
529 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
530 if deactivated_at.is_some() {
531 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
532 let update_result: Result<_, sqlx::Error> =
533 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
534 .bind(&handle)
535 .bind(account_id)
536 .execute(&mut *tx)
537 .await;
538 if let Err(e) = update_result {
539 if let Some(db_err) = e.as_database_error()
540 && db_err
541 .constraint()
542 .map(|c| c.contains("handle"))
543 .unwrap_or(false)
544 {
545 return (
546 StatusCode::BAD_REQUEST,
547 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
548 )
549 .into_response();
550 }
551 error!("Error reactivating account: {:?}", e);
552 return (
553 StatusCode::INTERNAL_SERVER_ERROR,
554 Json(json!({"error": "InternalError"})),
555 )
556 .into_response();
557 }
558 if let Err(e) = tx.commit().await {
559 error!("Error committing reactivation: {:?}", e);
560 return (
561 StatusCode::INTERNAL_SERVER_ERROR,
562 Json(json!({"error": "InternalError"})),
563 )
564 .into_response();
565 }
566 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
567 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
568 )
569 .bind(account_id)
570 .fetch_optional(&state.db)
571 .await
572 .unwrap_or(None);
573 let secret_key_bytes = match key_row {
574 Some((key_bytes, encryption_version)) => {
575 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
576 Ok(k) => k,
577 Err(e) => {
578 error!("Error decrypting key for reactivated account: {:?}", e);
579 return (
580 StatusCode::INTERNAL_SERVER_ERROR,
581 Json(json!({"error": "InternalError"})),
582 )
583 .into_response();
584 }
585 }
586 }
587 None => {
588 error!("No signing key found for reactivated account");
589 return (
590 StatusCode::INTERNAL_SERVER_ERROR,
591 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
592 )
593 .into_response();
594 }
595 };
596 let access_meta =
597 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
598 Ok(m) => m,
599 Err(e) => {
600 error!("Error creating access token: {:?}", e);
601 return (
602 StatusCode::INTERNAL_SERVER_ERROR,
603 Json(json!({"error": "InternalError"})),
604 )
605 .into_response();
606 }
607 };
608 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
609 &did,
610 &secret_key_bytes,
611 ) {
612 Ok(m) => m,
613 Err(e) => {
614 error!("Error creating refresh token: {:?}", e);
615 return (
616 StatusCode::INTERNAL_SERVER_ERROR,
617 Json(json!({"error": "InternalError"})),
618 )
619 .into_response();
620 }
621 };
622 let session_result: Result<_, sqlx::Error> = sqlx::query(
623 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
624 )
625 .bind(&did)
626 .bind(&access_meta.jti)
627 .bind(&refresh_meta.jti)
628 .bind(access_meta.expires_at)
629 .bind(refresh_meta.expires_at)
630 .execute(&state.db)
631 .await;
632 if let Err(e) = session_result {
633 error!("Error creating session: {:?}", e);
634 return (
635 StatusCode::INTERNAL_SERVER_ERROR,
636 Json(json!({"error": "InternalError"})),
637 )
638 .into_response();
639 }
640 return (
641 StatusCode::OK,
642 Json(CreateAccountOutput {
643 handle: handle.clone(),
644 did: did.clone(),
645 did_doc: state.did_resolver.resolve_did_document(&did).await,
646 access_jwt: access_meta.token,
647 refresh_jwt: refresh_meta.token,
648 verification_required: false,
649 verification_channel: "email".to_string(),
650 }),
651 )
652 .into_response();
653 } else {
654 return (
655 StatusCode::BAD_REQUEST,
656 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
657 )
658 .into_response();
659 }
660 }
661 }
662 let exists_result: Option<(i32,)> =
663 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
664 .bind(&handle)
665 .fetch_optional(&mut *tx)
666 .await
667 .unwrap_or(None);
668 if exists_result.is_some() {
669 return (
670 StatusCode::BAD_REQUEST,
671 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
672 )
673 .into_response();
674 }
675 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
676 .map(|v| v == "true" || v == "1")
677 .unwrap_or(false);
678 if invite_code_required
679 && input
680 .invite_code
681 .as_ref()
682 .map(|c| c.trim().is_empty())
683 .unwrap_or(true)
684 {
685 return (
686 StatusCode::BAD_REQUEST,
687 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
688 )
689 .into_response();
690 }
691 if let Some(code) = &input.invite_code
692 && !code.trim().is_empty()
693 {
694 let invite_query = sqlx::query!(
695 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
696 code
697 )
698 .fetch_optional(&mut *tx)
699 .await;
700 match invite_query {
701 Ok(Some(row)) => {
702 if row.available_uses <= 0 {
703 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
704 }
705 let update_invite = sqlx::query!(
706 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
707 code
708 )
709 .execute(&mut *tx)
710 .await;
711 if let Err(e) = update_invite {
712 error!("Error updating invite code: {:?}", e);
713 return (
714 StatusCode::INTERNAL_SERVER_ERROR,
715 Json(json!({"error": "InternalError"})),
716 )
717 .into_response();
718 }
719 }
720 Ok(None) => {
721 return (
722 StatusCode::BAD_REQUEST,
723 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
724 )
725 .into_response();
726 }
727 Err(e) => {
728 error!("Error checking invite code: {:?}", e);
729 return (
730 StatusCode::INTERNAL_SERVER_ERROR,
731 Json(json!({"error": "InternalError"})),
732 )
733 .into_response();
734 }
735 }
736 }
737 if let Err(e) = validate_password(&input.password) {
738 return (
739 StatusCode::BAD_REQUEST,
740 Json(json!({
741 "error": "InvalidPassword",
742 "message": e.to_string()
743 })),
744 )
745 .into_response();
746 }
747
748 let password_hash = match hash(&input.password, DEFAULT_COST) {
749 Ok(h) => h,
750 Err(e) => {
751 error!("Error hashing password: {:?}", e);
752 return (
753 StatusCode::INTERNAL_SERVER_ERROR,
754 Json(json!({"error": "InternalError"})),
755 )
756 .into_response();
757 }
758 };
759 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
760 .fetch_one(&mut *tx)
761 .await
762 .map(|c| c.unwrap_or(0) == 0)
763 .unwrap_or(false);
764 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
765 Some(chrono::Utc::now())
766 } else {
767 None
768 };
769 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
770 r#"INSERT INTO users (
771 handle, email, did, password_hash,
772 preferred_comms_channel,
773 discord_id, telegram_username, signal_number,
774 is_admin, deactivated_at, email_verified
775 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
776 )
777 .bind(&handle)
778 .bind(&email)
779 .bind(&did)
780 .bind(&password_hash)
781 .bind(verification_channel)
782 .bind(
783 input
784 .discord_id
785 .as_deref()
786 .map(|s| s.trim())
787 .filter(|s| !s.is_empty()),
788 )
789 .bind(
790 input
791 .telegram_username
792 .as_deref()
793 .map(|s| s.trim())
794 .filter(|s| !s.is_empty()),
795 )
796 .bind(
797 input
798 .signal_number
799 .as_deref()
800 .map(|s| s.trim())
801 .filter(|s| !s.is_empty()),
802 )
803 .bind(is_first_user)
804 .bind(deactivated_at)
805 .bind(false)
806 .fetch_one(&mut *tx)
807 .await;
808 let user_id = match user_insert {
809 Ok((id,)) => id,
810 Err(e) => {
811 if let Some(db_err) = e.as_database_error()
812 && db_err.code().as_deref() == Some("23505")
813 {
814 let constraint = db_err.constraint().unwrap_or("");
815 if constraint.contains("handle") || constraint.contains("users_handle") {
816 return (
817 StatusCode::BAD_REQUEST,
818 Json(json!({
819 "error": "HandleNotAvailable",
820 "message": "Handle already taken"
821 })),
822 )
823 .into_response();
824 } else if constraint.contains("email") || constraint.contains("users_email") {
825 return (
826 StatusCode::BAD_REQUEST,
827 Json(json!({
828 "error": "InvalidEmail",
829 "message": "Email already registered"
830 })),
831 )
832 .into_response();
833 } else if constraint.contains("did") || constraint.contains("users_did") {
834 return (
835 StatusCode::BAD_REQUEST,
836 Json(json!({
837 "error": "AccountAlreadyExists",
838 "message": "An account with this DID already exists"
839 })),
840 )
841 .into_response();
842 }
843 }
844 error!("Error inserting user: {:?}", e);
845 return (
846 StatusCode::INTERNAL_SERVER_ERROR,
847 Json(json!({"error": "InternalError"})),
848 )
849 .into_response();
850 }
851 };
852
853 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
854 Ok(enc) => enc,
855 Err(e) => {
856 error!("Error encrypting user key: {:?}", e);
857 return (
858 StatusCode::INTERNAL_SERVER_ERROR,
859 Json(json!({"error": "InternalError"})),
860 )
861 .into_response();
862 }
863 };
864 let key_insert = sqlx::query!(
865 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
866 user_id,
867 &encrypted_key_bytes[..],
868 crate::config::ENCRYPTION_VERSION
869 )
870 .execute(&mut *tx)
871 .await;
872 if let Err(e) = key_insert {
873 error!("Error inserting user key: {:?}", e);
874 return (
875 StatusCode::INTERNAL_SERVER_ERROR,
876 Json(json!({"error": "InternalError"})),
877 )
878 .into_response();
879 }
880 if let Some(key_id) = reserved_key_id {
881 let mark_used = sqlx::query!(
882 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
883 key_id
884 )
885 .execute(&mut *tx)
886 .await;
887 if let Err(e) = mark_used {
888 error!("Error marking reserved key as used: {:?}", e);
889 return (
890 StatusCode::INTERNAL_SERVER_ERROR,
891 Json(json!({"error": "InternalError"})),
892 )
893 .into_response();
894 }
895 }
896 let mst = Mst::new(Arc::new(state.block_store.clone()));
897 let mst_root = match mst.persist().await {
898 Ok(c) => c,
899 Err(e) => {
900 error!("Error persisting MST: {:?}", e);
901 return (
902 StatusCode::INTERNAL_SERVER_ERROR,
903 Json(json!({"error": "InternalError"})),
904 )
905 .into_response();
906 }
907 };
908 let rev = Tid::now(LimitedU32::MIN);
909 let (commit_bytes, _sig) =
910 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
911 Ok(result) => result,
912 Err(e) => {
913 error!("Error creating genesis commit: {:?}", e);
914 return (
915 StatusCode::INTERNAL_SERVER_ERROR,
916 Json(json!({"error": "InternalError"})),
917 )
918 .into_response();
919 }
920 };
921 let commit_cid = match state.block_store.put(&commit_bytes).await {
922 Ok(c) => c,
923 Err(e) => {
924 error!("Error saving genesis commit: {:?}", e);
925 return (
926 StatusCode::INTERNAL_SERVER_ERROR,
927 Json(json!({"error": "InternalError"})),
928 )
929 .into_response();
930 }
931 };
932 let commit_cid_str = commit_cid.to_string();
933 let rev_str = rev.as_ref().to_string();
934 let repo_insert = sqlx::query!(
935 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
936 user_id,
937 commit_cid_str,
938 rev_str
939 )
940 .execute(&mut *tx)
941 .await;
942 if let Err(e) = repo_insert {
943 error!("Error initializing repo: {:?}", e);
944 return (
945 StatusCode::INTERNAL_SERVER_ERROR,
946 Json(json!({"error": "InternalError"})),
947 )
948 .into_response();
949 }
950 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
951 if let Err(e) = sqlx::query!(
952 r#"
953 INSERT INTO user_blocks (user_id, block_cid)
954 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
955 ON CONFLICT (user_id, block_cid) DO NOTHING
956 "#,
957 user_id,
958 &genesis_block_cids
959 )
960 .execute(&mut *tx)
961 .await
962 {
963 error!("Error inserting user_blocks: {:?}", e);
964 return (
965 StatusCode::INTERNAL_SERVER_ERROR,
966 Json(json!({"error": "InternalError"})),
967 )
968 .into_response();
969 }
970 if let Some(code) = &input.invite_code
971 && !code.trim().is_empty()
972 {
973 let use_insert = sqlx::query!(
974 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
975 code,
976 user_id
977 )
978 .execute(&mut *tx)
979 .await;
980 if let Err(e) = use_insert {
981 error!("Error recording invite usage: {:?}", e);
982 return (
983 StatusCode::INTERNAL_SERVER_ERROR,
984 Json(json!({"error": "InternalError"})),
985 )
986 .into_response();
987 }
988 }
989 if let Err(e) = tx.commit().await {
990 error!("Error committing transaction: {:?}", e);
991 return (
992 StatusCode::INTERNAL_SERVER_ERROR,
993 Json(json!({"error": "InternalError"})),
994 )
995 .into_response();
996 }
997 if !is_migration && !is_did_web_byod {
998 if let Err(e) =
999 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
1000 {
1001 warn!("Failed to sequence identity event for {}: {}", did, e);
1002 }
1003 if let Err(e) =
1004 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
1005 {
1006 warn!("Failed to sequence account event for {}: {}", did, e);
1007 }
1008 if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
1009 &state,
1010 &did,
1011 &commit_cid,
1012 &mst_root,
1013 &rev_str,
1014 )
1015 .await
1016 {
1017 warn!("Failed to sequence commit event for {}: {}", did, e);
1018 }
1019 if let Err(e) = crate::api::repo::record::sequence_sync_event(
1020 &state,
1021 &did,
1022 &commit_cid_str,
1023 Some(rev.as_ref()),
1024 )
1025 .await
1026 {
1027 warn!("Failed to sequence sync event for {}: {}", did, e);
1028 }
1029 let profile_record = json!({
1030 "$type": "app.bsky.actor.profile",
1031 "displayName": input.handle
1032 });
1033 if let Err(e) = crate::api::repo::record::create_record_internal(
1034 &state,
1035 &did,
1036 "app.bsky.actor.profile",
1037 "self",
1038 &profile_record,
1039 )
1040 .await
1041 {
1042 warn!("Failed to create default profile for {}: {}", did, e);
1043 }
1044 }
1045 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
1046 if !is_migration {
1047 if let Some(ref recipient) = verification_recipient {
1048 let verification_token = crate::auth::verification_token::generate_signup_token(
1049 &did,
1050 verification_channel,
1051 recipient,
1052 );
1053 let formatted_token =
1054 crate::auth::verification_token::format_token_for_display(&verification_token);
1055 if let Err(e) = crate::comms::enqueue_signup_verification(
1056 &state.db,
1057 user_id,
1058 verification_channel,
1059 recipient,
1060 &formatted_token,
1061 None,
1062 )
1063 .await
1064 {
1065 warn!(
1066 "Failed to enqueue signup verification notification: {:?}",
1067 e
1068 );
1069 }
1070 }
1071 } else if let Some(ref user_email) = email {
1072 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
1073 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
1074 if let Err(e) = crate::comms::enqueue_migration_verification(
1075 &state.db,
1076 user_id,
1077 user_email,
1078 &formatted_token,
1079 &hostname,
1080 )
1081 .await
1082 {
1083 warn!("Failed to enqueue migration verification email: {:?}", e);
1084 }
1085 }
1086
1087 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes)
1088 {
1089 Ok(m) => m,
1090 Err(e) => {
1091 error!("createAccount: Error creating access token: {:?}", e);
1092 return (
1093 StatusCode::INTERNAL_SERVER_ERROR,
1094 Json(json!({"error": "InternalError"})),
1095 )
1096 .into_response();
1097 }
1098 };
1099 let refresh_meta =
1100 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1101 Ok(m) => m,
1102 Err(e) => {
1103 error!("createAccount: Error creating refresh token: {:?}", e);
1104 return (
1105 StatusCode::INTERNAL_SERVER_ERROR,
1106 Json(json!({"error": "InternalError"})),
1107 )
1108 .into_response();
1109 }
1110 };
1111 if let Err(e) = sqlx::query!(
1112 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1113 did,
1114 access_meta.jti,
1115 refresh_meta.jti,
1116 access_meta.expires_at,
1117 refresh_meta.expires_at
1118 )
1119 .execute(&state.db)
1120 .await
1121 {
1122 error!("createAccount: Error creating session: {:?}", e);
1123 return (
1124 StatusCode::INTERNAL_SERVER_ERROR,
1125 Json(json!({"error": "InternalError"})),
1126 )
1127 .into_response();
1128 }
1129
1130 let did_doc = state.did_resolver.resolve_did_document(&did).await;
1131
1132 if is_migration {
1133 info!(
1134 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}",
1135 did, handle
1136 );
1137 }
1138
1139 (
1140 StatusCode::OK,
1141 Json(CreateAccountOutput {
1142 handle: handle.clone(),
1143 did,
1144 did_doc,
1145 access_jwt: access_meta.token,
1146 refresh_jwt: refresh_meta.token,
1147 verification_required: !is_migration,
1148 verification_channel: verification_channel.to_string(),
1149 }),
1150 )
1151 .into_response()
1152}