this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub did_doc: Option<serde_json::Value>,
61 pub access_jwt: String,
62 pub refresh_jwt: String,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 let is_potential_migration = input
73 .did
74 .as_ref()
75 .map(|d| d.starts_with("did:plc:"))
76 .unwrap_or(false);
77 if is_potential_migration {
78 info!(
79 "[MIGRATION] createAccount called for potential migration did={:?} handle={}",
80 input.did, input.handle
81 );
82 } else {
83 info!("create_account called");
84 }
85 let client_ip = extract_client_ip(&headers);
86 if !state
87 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
88 .await
89 {
90 warn!(ip = %client_ip, "Account creation rate limit exceeded");
91 return (
92 StatusCode::TOO_MANY_REQUESTS,
93 Json(json!({
94 "error": "RateLimitExceeded",
95 "message": "Too many account creation attempts. Please try again later."
96 })),
97 )
98 .into_response();
99 }
100
101 let migration_auth = if let Some(token) =
102 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
103 {
104 if is_service_token(&token) {
105 let verifier = ServiceTokenVerifier::new();
106 match verifier
107 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
108 .await
109 {
110 Ok(claims) => {
111 debug!("Service token verified for migration: iss={}", claims.iss);
112 Some(claims.iss)
113 }
114 Err(e) => {
115 error!("Service token verification failed: {:?}", e);
116 return (
117 StatusCode::UNAUTHORIZED,
118 Json(json!({
119 "error": "AuthenticationFailed",
120 "message": format!("Service token verification failed: {}", e)
121 })),
122 )
123 .into_response();
124 }
125 }
126 } else {
127 None
128 }
129 } else {
130 None
131 };
132
133 let is_did_web_byod = migration_auth.is_some()
134 && input
135 .did
136 .as_ref()
137 .map(|d| d.starts_with("did:web:"))
138 .unwrap_or(false);
139
140 let is_migration = migration_auth.is_some()
141 && input
142 .did
143 .as_ref()
144 .map(|d| d.starts_with("did:plc:"))
145 .unwrap_or(false);
146
147 if (is_migration || is_did_web_byod)
148 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
149 {
150 if provided_did != auth_did {
151 info!(
152 "[MIGRATION] createAccount: Service token mismatch - token_did={} provided_did={}",
153 auth_did, provided_did
154 );
155 return (
156 StatusCode::FORBIDDEN,
157 Json(json!({
158 "error": "AuthorizationError",
159 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did)
160 })),
161 )
162 .into_response();
163 }
164 if is_did_web_byod {
165 info!(did = %provided_did, "Processing did:web BYOD account creation");
166 } else {
167 info!(
168 "[MIGRATION] createAccount: Service token verified, processing migration for did={}",
169 provided_did
170 );
171 }
172 }
173
174 let hostname_for_validation =
175 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
176 let pds_suffix = format!(".{}", hostname_for_validation);
177
178 let validated_short_handle = if !input.handle.contains('.')
179 || input.handle.ends_with(&pds_suffix)
180 {
181 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
182 input
183 .handle
184 .strip_suffix(&pds_suffix)
185 .unwrap_or(&input.handle)
186 } else {
187 &input.handle
188 };
189 match crate::api::validation::validate_short_handle(handle_to_validate) {
190 Ok(h) => h,
191 Err(e) => {
192 return (
193 StatusCode::BAD_REQUEST,
194 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
195 )
196 .into_response();
197 }
198 }
199 } else {
200 if input.handle.contains(' ') || input.handle.contains('\t') {
201 return (
202 StatusCode::BAD_REQUEST,
203 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
204 )
205 .into_response();
206 }
207 for c in input.handle.chars() {
208 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
209 return (
210 StatusCode::BAD_REQUEST,
211 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
212 )
213 .into_response();
214 }
215 }
216 let handle_lower = input.handle.to_lowercase();
217 if crate::moderation::has_explicit_slur(&handle_lower) {
218 return (
219 StatusCode::BAD_REQUEST,
220 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})),
221 )
222 .into_response();
223 }
224 handle_lower
225 };
226 let email: Option<String> = input
227 .email
228 .as_ref()
229 .map(|e| e.trim().to_string())
230 .filter(|e| !e.is_empty());
231 if let Some(ref email) = email
232 && !crate::api::validation::is_valid_email(email)
233 {
234 return (
235 StatusCode::BAD_REQUEST,
236 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
237 )
238 .into_response();
239 }
240 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
241 let valid_channels = ["email", "discord", "telegram", "signal"];
242 if !valid_channels.contains(&verification_channel) && !is_migration {
243 return (
244 StatusCode::BAD_REQUEST,
245 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
246 )
247 .into_response();
248 }
249 let verification_recipient = if is_migration {
250 None
251 } else {
252 Some(match verification_channel {
253 "email" => match &input.email {
254 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
255 _ => return (
256 StatusCode::BAD_REQUEST,
257 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
258 ).into_response(),
259 },
260 "discord" => match &input.discord_id {
261 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
262 _ => return (
263 StatusCode::BAD_REQUEST,
264 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
265 ).into_response(),
266 },
267 "telegram" => match &input.telegram_username {
268 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
269 _ => return (
270 StatusCode::BAD_REQUEST,
271 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
272 ).into_response(),
273 },
274 "signal" => match &input.signal_number {
275 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
276 _ => return (
277 StatusCode::BAD_REQUEST,
278 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
279 ).into_response(),
280 },
281 _ => return (
282 StatusCode::BAD_REQUEST,
283 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
284 ).into_response(),
285 })
286 };
287 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
288 let pds_endpoint = format!("https://{}", hostname);
289 let suffix = format!(".{}", hostname);
290 let handle = if input.handle.ends_with(&suffix) {
291 format!("{}.{}", validated_short_handle, hostname)
292 } else if input.handle.contains('.') {
293 validated_short_handle.clone()
294 } else {
295 format!("{}.{}", validated_short_handle, hostname)
296 };
297 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
298 if let Some(signing_key_did) = &input.signing_key {
299 let reserved = sqlx::query!(
300 r#"
301 SELECT id, private_key_bytes
302 FROM reserved_signing_keys
303 WHERE public_key_did_key = $1
304 AND used_at IS NULL
305 AND expires_at > NOW()
306 FOR UPDATE
307 "#,
308 signing_key_did
309 )
310 .fetch_optional(&state.db)
311 .await;
312 match reserved {
313 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
314 Ok(None) => {
315 return (
316 StatusCode::BAD_REQUEST,
317 Json(json!({
318 "error": "InvalidSigningKey",
319 "message": "Signing key not found, already used, or expired"
320 })),
321 )
322 .into_response();
323 }
324 Err(e) => {
325 error!("Error looking up reserved signing key: {:?}", e);
326 return (
327 StatusCode::INTERNAL_SERVER_ERROR,
328 Json(json!({"error": "InternalError"})),
329 )
330 .into_response();
331 }
332 }
333 } else {
334 let secret_key = SecretKey::random(&mut OsRng);
335 (secret_key.to_bytes().to_vec(), None)
336 };
337 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
338 Ok(k) => k,
339 Err(e) => {
340 error!("Error creating signing key: {:?}", e);
341 return (
342 StatusCode::INTERNAL_SERVER_ERROR,
343 Json(json!({"error": "InternalError"})),
344 )
345 .into_response();
346 }
347 };
348 let did_type = input.did_type.as_deref().unwrap_or("plc");
349 let did = match did_type {
350 "web" => {
351 let subdomain_host = format!("{}.{}", input.handle, hostname);
352 let encoded_subdomain = subdomain_host.replace(':', "%3A");
353 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
354 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
355 self_hosted_did
356 }
357 "web-external" => {
358 let d = match &input.did {
359 Some(d) if !d.trim().is_empty() => d,
360 _ => {
361 return (
362 StatusCode::BAD_REQUEST,
363 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
364 )
365 .into_response();
366 }
367 };
368 if !d.starts_with("did:web:") {
369 return (
370 StatusCode::BAD_REQUEST,
371 Json(
372 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
373 ),
374 )
375 .into_response();
376 }
377 if !is_did_web_byod
378 && let Err(e) =
379 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
380 {
381 return (
382 StatusCode::BAD_REQUEST,
383 Json(json!({"error": "InvalidDid", "message": e})),
384 )
385 .into_response();
386 }
387 info!(did = %d, "Creating external did:web account");
388 d.clone()
389 }
390 _ => {
391 if let Some(d) = &input.did {
392 if d.starts_with("did:plc:") && is_migration {
393 info!(did = %d, "Migration with existing did:plc");
394 d.clone()
395 } else if d.starts_with("did:web:") {
396 if !is_did_web_byod
397 && let Err(e) = verify_did_web(
398 d,
399 &hostname,
400 &input.handle,
401 input.signing_key.as_deref(),
402 )
403 .await
404 {
405 return (
406 StatusCode::BAD_REQUEST,
407 Json(json!({"error": "InvalidDid", "message": e})),
408 )
409 .into_response();
410 }
411 d.clone()
412 } else if !d.trim().is_empty() {
413 return (
414 StatusCode::BAD_REQUEST,
415 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
416 )
417 .into_response();
418 } else {
419 let rotation_key = std::env::var("PLC_ROTATION_KEY")
420 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
421 let genesis_result = match create_genesis_operation(
422 &signing_key,
423 &rotation_key,
424 &handle,
425 &pds_endpoint,
426 ) {
427 Ok(r) => r,
428 Err(e) => {
429 error!("Error creating PLC genesis operation: {:?}", e);
430 return (
431 StatusCode::INTERNAL_SERVER_ERROR,
432 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
433 )
434 .into_response();
435 }
436 };
437 let plc_client = PlcClient::new(None);
438 if let Err(e) = plc_client
439 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
440 .await
441 {
442 error!("Failed to submit PLC genesis operation: {:?}", e);
443 return (
444 StatusCode::BAD_GATEWAY,
445 Json(json!({
446 "error": "UpstreamError",
447 "message": format!("Failed to register DID with PLC directory: {}", e)
448 })),
449 )
450 .into_response();
451 }
452 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
453 genesis_result.did
454 }
455 } else {
456 let rotation_key = std::env::var("PLC_ROTATION_KEY")
457 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
458 let genesis_result = match create_genesis_operation(
459 &signing_key,
460 &rotation_key,
461 &handle,
462 &pds_endpoint,
463 ) {
464 Ok(r) => r,
465 Err(e) => {
466 error!("Error creating PLC genesis operation: {:?}", e);
467 return (
468 StatusCode::INTERNAL_SERVER_ERROR,
469 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
470 )
471 .into_response();
472 }
473 };
474 let plc_client = PlcClient::new(None);
475 if let Err(e) = plc_client
476 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
477 .await
478 {
479 error!("Failed to submit PLC genesis operation: {:?}", e);
480 return (
481 StatusCode::BAD_GATEWAY,
482 Json(json!({
483 "error": "UpstreamError",
484 "message": format!("Failed to register DID with PLC directory: {}", e)
485 })),
486 )
487 .into_response();
488 }
489 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
490 genesis_result.did
491 }
492 }
493 };
494 let mut tx = match state.db.begin().await {
495 Ok(tx) => tx,
496 Err(e) => {
497 error!("Error starting transaction: {:?}", e);
498 return (
499 StatusCode::INTERNAL_SERVER_ERROR,
500 Json(json!({"error": "InternalError"})),
501 )
502 .into_response();
503 }
504 };
505 if is_migration {
506 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
507 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
508 .bind(&did)
509 .fetch_optional(&mut *tx)
510 .await
511 .unwrap_or(None);
512 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
513 if deactivated_at.is_some() {
514 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
515 let update_result: Result<_, sqlx::Error> =
516 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
517 .bind(&handle)
518 .bind(account_id)
519 .execute(&mut *tx)
520 .await;
521 if let Err(e) = update_result {
522 if let Some(db_err) = e.as_database_error()
523 && db_err
524 .constraint()
525 .map(|c| c.contains("handle"))
526 .unwrap_or(false)
527 {
528 return (
529 StatusCode::BAD_REQUEST,
530 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
531 )
532 .into_response();
533 }
534 error!("Error reactivating account: {:?}", e);
535 return (
536 StatusCode::INTERNAL_SERVER_ERROR,
537 Json(json!({"error": "InternalError"})),
538 )
539 .into_response();
540 }
541 if let Err(e) = tx.commit().await {
542 error!("Error committing reactivation: {:?}", e);
543 return (
544 StatusCode::INTERNAL_SERVER_ERROR,
545 Json(json!({"error": "InternalError"})),
546 )
547 .into_response();
548 }
549 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
550 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
551 )
552 .bind(account_id)
553 .fetch_optional(&state.db)
554 .await
555 .unwrap_or(None);
556 let secret_key_bytes = match key_row {
557 Some((key_bytes, encryption_version)) => {
558 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
559 Ok(k) => k,
560 Err(e) => {
561 error!("Error decrypting key for reactivated account: {:?}", e);
562 return (
563 StatusCode::INTERNAL_SERVER_ERROR,
564 Json(json!({"error": "InternalError"})),
565 )
566 .into_response();
567 }
568 }
569 }
570 None => {
571 error!("No signing key found for reactivated account");
572 return (
573 StatusCode::INTERNAL_SERVER_ERROR,
574 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
575 )
576 .into_response();
577 }
578 };
579 let access_meta =
580 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
581 Ok(m) => m,
582 Err(e) => {
583 error!("Error creating access token: {:?}", e);
584 return (
585 StatusCode::INTERNAL_SERVER_ERROR,
586 Json(json!({"error": "InternalError"})),
587 )
588 .into_response();
589 }
590 };
591 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
592 &did,
593 &secret_key_bytes,
594 ) {
595 Ok(m) => m,
596 Err(e) => {
597 error!("Error creating refresh token: {:?}", e);
598 return (
599 StatusCode::INTERNAL_SERVER_ERROR,
600 Json(json!({"error": "InternalError"})),
601 )
602 .into_response();
603 }
604 };
605 let session_result: Result<_, sqlx::Error> = sqlx::query(
606 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
607 )
608 .bind(&did)
609 .bind(&access_meta.jti)
610 .bind(&refresh_meta.jti)
611 .bind(access_meta.expires_at)
612 .bind(refresh_meta.expires_at)
613 .execute(&state.db)
614 .await;
615 if let Err(e) = session_result {
616 error!("Error creating session: {:?}", e);
617 return (
618 StatusCode::INTERNAL_SERVER_ERROR,
619 Json(json!({"error": "InternalError"})),
620 )
621 .into_response();
622 }
623 return (
624 StatusCode::OK,
625 Json(CreateAccountOutput {
626 handle: handle.clone(),
627 did: did.clone(),
628 did_doc: state.did_resolver.resolve_did_document(&did).await,
629 access_jwt: access_meta.token,
630 refresh_jwt: refresh_meta.token,
631 verification_required: false,
632 verification_channel: "email".to_string(),
633 }),
634 )
635 .into_response();
636 } else {
637 return (
638 StatusCode::BAD_REQUEST,
639 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
640 )
641 .into_response();
642 }
643 }
644 }
645 let exists_result: Option<(i32,)> =
646 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
647 .bind(&handle)
648 .fetch_optional(&mut *tx)
649 .await
650 .unwrap_or(None);
651 if exists_result.is_some() {
652 return (
653 StatusCode::BAD_REQUEST,
654 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
655 )
656 .into_response();
657 }
658 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
659 .map(|v| v == "true" || v == "1")
660 .unwrap_or(false);
661 if invite_code_required
662 && input
663 .invite_code
664 .as_ref()
665 .map(|c| c.trim().is_empty())
666 .unwrap_or(true)
667 {
668 return (
669 StatusCode::BAD_REQUEST,
670 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
671 )
672 .into_response();
673 }
674 if let Some(code) = &input.invite_code
675 && !code.trim().is_empty()
676 {
677 let invite_query = sqlx::query!(
678 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
679 code
680 )
681 .fetch_optional(&mut *tx)
682 .await;
683 match invite_query {
684 Ok(Some(row)) => {
685 if row.available_uses <= 0 {
686 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
687 }
688 let update_invite = sqlx::query!(
689 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
690 code
691 )
692 .execute(&mut *tx)
693 .await;
694 if let Err(e) = update_invite {
695 error!("Error updating invite code: {:?}", e);
696 return (
697 StatusCode::INTERNAL_SERVER_ERROR,
698 Json(json!({"error": "InternalError"})),
699 )
700 .into_response();
701 }
702 }
703 Ok(None) => {
704 return (
705 StatusCode::BAD_REQUEST,
706 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
707 )
708 .into_response();
709 }
710 Err(e) => {
711 error!("Error checking invite code: {:?}", e);
712 return (
713 StatusCode::INTERNAL_SERVER_ERROR,
714 Json(json!({"error": "InternalError"})),
715 )
716 .into_response();
717 }
718 }
719 }
720 if let Err(e) = validate_password(&input.password) {
721 return (
722 StatusCode::BAD_REQUEST,
723 Json(json!({
724 "error": "InvalidPassword",
725 "message": e.to_string()
726 })),
727 )
728 .into_response();
729 }
730
731 let password_hash = match hash(&input.password, DEFAULT_COST) {
732 Ok(h) => h,
733 Err(e) => {
734 error!("Error hashing password: {:?}", e);
735 return (
736 StatusCode::INTERNAL_SERVER_ERROR,
737 Json(json!({"error": "InternalError"})),
738 )
739 .into_response();
740 }
741 };
742 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
743 .fetch_one(&mut *tx)
744 .await
745 .map(|c| c.unwrap_or(0) == 0)
746 .unwrap_or(false);
747 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
748 Some(chrono::Utc::now())
749 } else {
750 None
751 };
752 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
753 r#"INSERT INTO users (
754 handle, email, did, password_hash,
755 preferred_comms_channel,
756 discord_id, telegram_username, signal_number,
757 is_admin, deactivated_at, email_verified
758 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
759 )
760 .bind(&handle)
761 .bind(&email)
762 .bind(&did)
763 .bind(&password_hash)
764 .bind(verification_channel)
765 .bind(
766 input
767 .discord_id
768 .as_deref()
769 .map(|s| s.trim())
770 .filter(|s| !s.is_empty()),
771 )
772 .bind(
773 input
774 .telegram_username
775 .as_deref()
776 .map(|s| s.trim())
777 .filter(|s| !s.is_empty()),
778 )
779 .bind(
780 input
781 .signal_number
782 .as_deref()
783 .map(|s| s.trim())
784 .filter(|s| !s.is_empty()),
785 )
786 .bind(is_first_user)
787 .bind(deactivated_at)
788 .bind(false)
789 .fetch_one(&mut *tx)
790 .await;
791 let user_id = match user_insert {
792 Ok((id,)) => id,
793 Err(e) => {
794 if let Some(db_err) = e.as_database_error()
795 && db_err.code().as_deref() == Some("23505")
796 {
797 let constraint = db_err.constraint().unwrap_or("");
798 if constraint.contains("handle") || constraint.contains("users_handle") {
799 return (
800 StatusCode::BAD_REQUEST,
801 Json(json!({
802 "error": "HandleNotAvailable",
803 "message": "Handle already taken"
804 })),
805 )
806 .into_response();
807 } else if constraint.contains("email") || constraint.contains("users_email") {
808 return (
809 StatusCode::BAD_REQUEST,
810 Json(json!({
811 "error": "InvalidEmail",
812 "message": "Email already registered"
813 })),
814 )
815 .into_response();
816 } else if constraint.contains("did") || constraint.contains("users_did") {
817 return (
818 StatusCode::BAD_REQUEST,
819 Json(json!({
820 "error": "AccountAlreadyExists",
821 "message": "An account with this DID already exists"
822 })),
823 )
824 .into_response();
825 }
826 }
827 error!("Error inserting user: {:?}", e);
828 return (
829 StatusCode::INTERNAL_SERVER_ERROR,
830 Json(json!({"error": "InternalError"})),
831 )
832 .into_response();
833 }
834 };
835
836 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
837 Ok(enc) => enc,
838 Err(e) => {
839 error!("Error encrypting user key: {:?}", e);
840 return (
841 StatusCode::INTERNAL_SERVER_ERROR,
842 Json(json!({"error": "InternalError"})),
843 )
844 .into_response();
845 }
846 };
847 let key_insert = sqlx::query!(
848 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
849 user_id,
850 &encrypted_key_bytes[..],
851 crate::config::ENCRYPTION_VERSION
852 )
853 .execute(&mut *tx)
854 .await;
855 if let Err(e) = key_insert {
856 error!("Error inserting user key: {:?}", e);
857 return (
858 StatusCode::INTERNAL_SERVER_ERROR,
859 Json(json!({"error": "InternalError"})),
860 )
861 .into_response();
862 }
863 if let Some(key_id) = reserved_key_id {
864 let mark_used = sqlx::query!(
865 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
866 key_id
867 )
868 .execute(&mut *tx)
869 .await;
870 if let Err(e) = mark_used {
871 error!("Error marking reserved key as used: {:?}", e);
872 return (
873 StatusCode::INTERNAL_SERVER_ERROR,
874 Json(json!({"error": "InternalError"})),
875 )
876 .into_response();
877 }
878 }
879 let mst = Mst::new(Arc::new(state.block_store.clone()));
880 let mst_root = match mst.persist().await {
881 Ok(c) => c,
882 Err(e) => {
883 error!("Error persisting MST: {:?}", e);
884 return (
885 StatusCode::INTERNAL_SERVER_ERROR,
886 Json(json!({"error": "InternalError"})),
887 )
888 .into_response();
889 }
890 };
891 let rev = Tid::now(LimitedU32::MIN);
892 let (commit_bytes, _sig) =
893 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
894 Ok(result) => result,
895 Err(e) => {
896 error!("Error creating genesis commit: {:?}", e);
897 return (
898 StatusCode::INTERNAL_SERVER_ERROR,
899 Json(json!({"error": "InternalError"})),
900 )
901 .into_response();
902 }
903 };
904 let commit_cid = match state.block_store.put(&commit_bytes).await {
905 Ok(c) => c,
906 Err(e) => {
907 error!("Error saving genesis commit: {:?}", e);
908 return (
909 StatusCode::INTERNAL_SERVER_ERROR,
910 Json(json!({"error": "InternalError"})),
911 )
912 .into_response();
913 }
914 };
915 let commit_cid_str = commit_cid.to_string();
916 let rev_str = rev.as_ref().to_string();
917 let repo_insert = sqlx::query!(
918 "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
919 user_id,
920 commit_cid_str,
921 rev_str
922 )
923 .execute(&mut *tx)
924 .await;
925 if let Err(e) = repo_insert {
926 error!("Error initializing repo: {:?}", e);
927 return (
928 StatusCode::INTERNAL_SERVER_ERROR,
929 Json(json!({"error": "InternalError"})),
930 )
931 .into_response();
932 }
933 let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
934 if let Err(e) = sqlx::query!(
935 r#"
936 INSERT INTO user_blocks (user_id, block_cid)
937 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
938 ON CONFLICT (user_id, block_cid) DO NOTHING
939 "#,
940 user_id,
941 &genesis_block_cids
942 )
943 .execute(&mut *tx)
944 .await
945 {
946 error!("Error inserting user_blocks: {:?}", e);
947 return (
948 StatusCode::INTERNAL_SERVER_ERROR,
949 Json(json!({"error": "InternalError"})),
950 )
951 .into_response();
952 }
953 if let Some(code) = &input.invite_code
954 && !code.trim().is_empty()
955 {
956 let use_insert = sqlx::query!(
957 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
958 code,
959 user_id
960 )
961 .execute(&mut *tx)
962 .await;
963 if let Err(e) = use_insert {
964 error!("Error recording invite usage: {:?}", e);
965 return (
966 StatusCode::INTERNAL_SERVER_ERROR,
967 Json(json!({"error": "InternalError"})),
968 )
969 .into_response();
970 }
971 }
972 if let Err(e) = tx.commit().await {
973 error!("Error committing transaction: {:?}", e);
974 return (
975 StatusCode::INTERNAL_SERVER_ERROR,
976 Json(json!({"error": "InternalError"})),
977 )
978 .into_response();
979 }
980 if !is_migration && !is_did_web_byod {
981 if let Err(e) =
982 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
983 {
984 warn!("Failed to sequence identity event for {}: {}", did, e);
985 }
986 if let Err(e) =
987 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
988 {
989 warn!("Failed to sequence account event for {}: {}", did, e);
990 }
991 if let Err(e) =
992 crate::api::repo::record::sequence_empty_commit_event(&state, &did).await
993 {
994 warn!("Failed to sequence commit event for {}: {}", did, e);
995 }
996 if let Err(e) = crate::api::repo::record::sequence_sync_event(
997 &state,
998 &did,
999 &commit_cid_str,
1000 Some(rev.as_ref()),
1001 )
1002 .await
1003 {
1004 warn!("Failed to sequence sync event for {}: {}", did, e);
1005 }
1006 let profile_record = json!({
1007 "$type": "app.bsky.actor.profile",
1008 "displayName": input.handle
1009 });
1010 if let Err(e) = crate::api::repo::record::create_record_internal(
1011 &state,
1012 &did,
1013 "app.bsky.actor.profile",
1014 "self",
1015 &profile_record,
1016 )
1017 .await
1018 {
1019 warn!("Failed to create default profile for {}: {}", did, e);
1020 }
1021 }
1022 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
1023 if !is_migration {
1024 if let Some(ref recipient) = verification_recipient {
1025 let verification_token = crate::auth::verification_token::generate_signup_token(
1026 &did,
1027 verification_channel,
1028 recipient,
1029 );
1030 let formatted_token =
1031 crate::auth::verification_token::format_token_for_display(&verification_token);
1032 if let Err(e) = crate::comms::enqueue_signup_verification(
1033 &state.db,
1034 user_id,
1035 verification_channel,
1036 recipient,
1037 &formatted_token,
1038 None,
1039 )
1040 .await
1041 {
1042 warn!(
1043 "Failed to enqueue signup verification notification: {:?}",
1044 e
1045 );
1046 }
1047 }
1048 } else if let Some(ref user_email) = email {
1049 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
1050 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
1051 if let Err(e) = crate::comms::enqueue_migration_verification(
1052 &state.db,
1053 user_id,
1054 user_email,
1055 &formatted_token,
1056 &hostname,
1057 )
1058 .await
1059 {
1060 warn!("Failed to enqueue migration verification email: {:?}", e);
1061 }
1062 }
1063
1064 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes)
1065 {
1066 Ok(m) => m,
1067 Err(e) => {
1068 error!("createAccount: Error creating access token: {:?}", e);
1069 return (
1070 StatusCode::INTERNAL_SERVER_ERROR,
1071 Json(json!({"error": "InternalError"})),
1072 )
1073 .into_response();
1074 }
1075 };
1076 let refresh_meta =
1077 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1078 Ok(m) => m,
1079 Err(e) => {
1080 error!("createAccount: Error creating refresh token: {:?}", e);
1081 return (
1082 StatusCode::INTERNAL_SERVER_ERROR,
1083 Json(json!({"error": "InternalError"})),
1084 )
1085 .into_response();
1086 }
1087 };
1088 if let Err(e) = sqlx::query!(
1089 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1090 did,
1091 access_meta.jti,
1092 refresh_meta.jti,
1093 access_meta.expires_at,
1094 refresh_meta.expires_at
1095 )
1096 .execute(&state.db)
1097 .await
1098 {
1099 error!("createAccount: Error creating session: {:?}", e);
1100 return (
1101 StatusCode::INTERNAL_SERVER_ERROR,
1102 Json(json!({"error": "InternalError"})),
1103 )
1104 .into_response();
1105 }
1106
1107 let did_doc = state.did_resolver.resolve_did_document(&did).await;
1108
1109 if is_migration {
1110 info!(
1111 "[MIGRATION] createAccount: SUCCESS - Account ready for migration did={} handle={}",
1112 did, handle
1113 );
1114 }
1115
1116 (
1117 StatusCode::OK,
1118 Json(CreateAccountOutput {
1119 handle: handle.clone(),
1120 did,
1121 did_doc,
1122 access_jwt: access_meta.token,
1123 refresh_jwt: refresh_meta.token,
1124 verification_required: !is_migration,
1125 verification_channel: verification_channel.to_string(),
1126 }),
1127 )
1128 .into_response()
1129}
1130