this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub access_jwt: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub refresh_jwt: Option<String>,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 info!("create_account called");
73 let client_ip = extract_client_ip(&headers);
74 if !state
75 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
76 .await
77 {
78 warn!(ip = %client_ip, "Account creation rate limit exceeded");
79 return (
80 StatusCode::TOO_MANY_REQUESTS,
81 Json(json!({
82 "error": "RateLimitExceeded",
83 "message": "Too many account creation attempts. Please try again later."
84 })),
85 )
86 .into_response();
87 }
88
89 let migration_auth = if let Some(token) =
90 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
91 {
92 if is_service_token(&token) {
93 let verifier = ServiceTokenVerifier::new();
94 match verifier
95 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
96 .await
97 {
98 Ok(claims) => {
99 debug!("Service token verified for migration: iss={}", claims.iss);
100 Some(claims.iss)
101 }
102 Err(e) => {
103 error!("Service token verification failed: {:?}", e);
104 return (
105 StatusCode::UNAUTHORIZED,
106 Json(json!({
107 "error": "AuthenticationFailed",
108 "message": format!("Service token verification failed: {}", e)
109 })),
110 )
111 .into_response();
112 }
113 }
114 } else {
115 None
116 }
117 } else {
118 None
119 };
120
121 let is_did_web_byod = migration_auth.is_some()
122 && input
123 .did
124 .as_ref()
125 .map(|d| d.starts_with("did:web:"))
126 .unwrap_or(false);
127
128 let is_migration = migration_auth.is_some()
129 && input
130 .did
131 .as_ref()
132 .map(|d| d.starts_with("did:plc:"))
133 .unwrap_or(false);
134
135 if (is_migration || is_did_web_byod)
136 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
137 {
138 if provided_did != auth_did {
139 return (
140 StatusCode::FORBIDDEN,
141 Json(json!({
142 "error": "AuthorizationError",
143 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did)
144 })),
145 )
146 .into_response();
147 }
148 if is_did_web_byod {
149 info!(did = %provided_did, "Processing did:web BYOD account creation");
150 } else {
151 info!(did = %provided_did, "Processing account migration");
152 }
153 }
154
155 let hostname_for_validation =
156 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
157 let pds_suffix = format!(".{}", hostname_for_validation);
158
159 let validated_short_handle = if !input.handle.contains('.')
160 || input.handle.ends_with(&pds_suffix)
161 {
162 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
163 input
164 .handle
165 .strip_suffix(&pds_suffix)
166 .unwrap_or(&input.handle)
167 } else {
168 &input.handle
169 };
170 match crate::api::validation::validate_short_handle(handle_to_validate) {
171 Ok(h) => h,
172 Err(e) => {
173 return (
174 StatusCode::BAD_REQUEST,
175 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
176 )
177 .into_response();
178 }
179 }
180 } else {
181 if input.handle.contains(' ') || input.handle.contains('\t') {
182 return (
183 StatusCode::BAD_REQUEST,
184 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
185 )
186 .into_response();
187 }
188 for c in input.handle.chars() {
189 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
190 return (
191 StatusCode::BAD_REQUEST,
192 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
193 )
194 .into_response();
195 }
196 }
197 input.handle.to_lowercase()
198 };
199 let email: Option<String> = input
200 .email
201 .as_ref()
202 .map(|e| e.trim().to_string())
203 .filter(|e| !e.is_empty());
204 if let Some(ref email) = email
205 && !crate::api::validation::is_valid_email(email)
206 {
207 return (
208 StatusCode::BAD_REQUEST,
209 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
210 )
211 .into_response();
212 }
213 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
214 let valid_channels = ["email", "discord", "telegram", "signal"];
215 if !valid_channels.contains(&verification_channel) && !is_migration {
216 return (
217 StatusCode::BAD_REQUEST,
218 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
219 )
220 .into_response();
221 }
222 let verification_recipient = if is_migration {
223 None
224 } else {
225 Some(match verification_channel {
226 "email" => match &input.email {
227 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
228 _ => return (
229 StatusCode::BAD_REQUEST,
230 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
231 ).into_response(),
232 },
233 "discord" => match &input.discord_id {
234 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
235 _ => return (
236 StatusCode::BAD_REQUEST,
237 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
238 ).into_response(),
239 },
240 "telegram" => match &input.telegram_username {
241 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
242 _ => return (
243 StatusCode::BAD_REQUEST,
244 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
245 ).into_response(),
246 },
247 "signal" => match &input.signal_number {
248 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
249 _ => return (
250 StatusCode::BAD_REQUEST,
251 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
252 ).into_response(),
253 },
254 _ => return (
255 StatusCode::BAD_REQUEST,
256 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
257 ).into_response(),
258 })
259 };
260 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
261 let pds_endpoint = format!("https://{}", hostname);
262 let suffix = format!(".{}", hostname);
263 let handle = if input.handle.ends_with(&suffix) {
264 format!("{}.{}", validated_short_handle, hostname)
265 } else if input.handle.contains('.') {
266 validated_short_handle.clone()
267 } else {
268 format!("{}.{}", validated_short_handle, hostname)
269 };
270 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
271 if let Some(signing_key_did) = &input.signing_key {
272 let reserved = sqlx::query!(
273 r#"
274 SELECT id, private_key_bytes
275 FROM reserved_signing_keys
276 WHERE public_key_did_key = $1
277 AND used_at IS NULL
278 AND expires_at > NOW()
279 FOR UPDATE
280 "#,
281 signing_key_did
282 )
283 .fetch_optional(&state.db)
284 .await;
285 match reserved {
286 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
287 Ok(None) => {
288 return (
289 StatusCode::BAD_REQUEST,
290 Json(json!({
291 "error": "InvalidSigningKey",
292 "message": "Signing key not found, already used, or expired"
293 })),
294 )
295 .into_response();
296 }
297 Err(e) => {
298 error!("Error looking up reserved signing key: {:?}", e);
299 return (
300 StatusCode::INTERNAL_SERVER_ERROR,
301 Json(json!({"error": "InternalError"})),
302 )
303 .into_response();
304 }
305 }
306 } else {
307 let secret_key = SecretKey::random(&mut OsRng);
308 (secret_key.to_bytes().to_vec(), None)
309 };
310 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
311 Ok(k) => k,
312 Err(e) => {
313 error!("Error creating signing key: {:?}", e);
314 return (
315 StatusCode::INTERNAL_SERVER_ERROR,
316 Json(json!({"error": "InternalError"})),
317 )
318 .into_response();
319 }
320 };
321 let did_type = input.did_type.as_deref().unwrap_or("plc");
322 let did = match did_type {
323 "web" => {
324 let subdomain_host = format!("{}.{}", input.handle, hostname);
325 let encoded_subdomain = subdomain_host.replace(':', "%3A");
326 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
327 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
328 self_hosted_did
329 }
330 "web-external" => {
331 let d = match &input.did {
332 Some(d) if !d.trim().is_empty() => d,
333 _ => {
334 return (
335 StatusCode::BAD_REQUEST,
336 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
337 )
338 .into_response();
339 }
340 };
341 if !d.starts_with("did:web:") {
342 return (
343 StatusCode::BAD_REQUEST,
344 Json(
345 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
346 ),
347 )
348 .into_response();
349 }
350 if !is_did_web_byod
351 && let Err(e) =
352 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
353 {
354 return (
355 StatusCode::BAD_REQUEST,
356 Json(json!({"error": "InvalidDid", "message": e})),
357 )
358 .into_response();
359 }
360 info!(did = %d, "Creating external did:web account");
361 d.clone()
362 }
363 _ => {
364 if let Some(d) = &input.did {
365 if d.starts_with("did:plc:") && is_migration {
366 info!(did = %d, "Migration with existing did:plc");
367 d.clone()
368 } else if d.starts_with("did:web:") {
369 if !is_did_web_byod
370 && let Err(e) = verify_did_web(
371 d,
372 &hostname,
373 &input.handle,
374 input.signing_key.as_deref(),
375 )
376 .await
377 {
378 return (
379 StatusCode::BAD_REQUEST,
380 Json(json!({"error": "InvalidDid", "message": e})),
381 )
382 .into_response();
383 }
384 d.clone()
385 } else if !d.trim().is_empty() {
386 return (
387 StatusCode::BAD_REQUEST,
388 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
389 )
390 .into_response();
391 } else {
392 let rotation_key = std::env::var("PLC_ROTATION_KEY")
393 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
394 let genesis_result = match create_genesis_operation(
395 &signing_key,
396 &rotation_key,
397 &handle,
398 &pds_endpoint,
399 ) {
400 Ok(r) => r,
401 Err(e) => {
402 error!("Error creating PLC genesis operation: {:?}", e);
403 return (
404 StatusCode::INTERNAL_SERVER_ERROR,
405 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
406 )
407 .into_response();
408 }
409 };
410 let plc_client = PlcClient::new(None);
411 if let Err(e) = plc_client
412 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
413 .await
414 {
415 error!("Failed to submit PLC genesis operation: {:?}", e);
416 return (
417 StatusCode::BAD_GATEWAY,
418 Json(json!({
419 "error": "UpstreamError",
420 "message": format!("Failed to register DID with PLC directory: {}", e)
421 })),
422 )
423 .into_response();
424 }
425 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
426 genesis_result.did
427 }
428 } else {
429 let rotation_key = std::env::var("PLC_ROTATION_KEY")
430 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
431 let genesis_result = match create_genesis_operation(
432 &signing_key,
433 &rotation_key,
434 &handle,
435 &pds_endpoint,
436 ) {
437 Ok(r) => r,
438 Err(e) => {
439 error!("Error creating PLC genesis operation: {:?}", e);
440 return (
441 StatusCode::INTERNAL_SERVER_ERROR,
442 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
443 )
444 .into_response();
445 }
446 };
447 let plc_client = PlcClient::new(None);
448 if let Err(e) = plc_client
449 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
450 .await
451 {
452 error!("Failed to submit PLC genesis operation: {:?}", e);
453 return (
454 StatusCode::BAD_GATEWAY,
455 Json(json!({
456 "error": "UpstreamError",
457 "message": format!("Failed to register DID with PLC directory: {}", e)
458 })),
459 )
460 .into_response();
461 }
462 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
463 genesis_result.did
464 }
465 }
466 };
467 let mut tx = match state.db.begin().await {
468 Ok(tx) => tx,
469 Err(e) => {
470 error!("Error starting transaction: {:?}", e);
471 return (
472 StatusCode::INTERNAL_SERVER_ERROR,
473 Json(json!({"error": "InternalError"})),
474 )
475 .into_response();
476 }
477 };
478 if is_migration {
479 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
480 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
481 .bind(&did)
482 .fetch_optional(&mut *tx)
483 .await
484 .unwrap_or(None);
485 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
486 if deactivated_at.is_some() {
487 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
488 let update_result: Result<_, sqlx::Error> =
489 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
490 .bind(&handle)
491 .bind(account_id)
492 .execute(&mut *tx)
493 .await;
494 if let Err(e) = update_result {
495 if let Some(db_err) = e.as_database_error()
496 && db_err
497 .constraint()
498 .map(|c| c.contains("handle"))
499 .unwrap_or(false)
500 {
501 return (
502 StatusCode::BAD_REQUEST,
503 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
504 )
505 .into_response();
506 }
507 error!("Error reactivating account: {:?}", e);
508 return (
509 StatusCode::INTERNAL_SERVER_ERROR,
510 Json(json!({"error": "InternalError"})),
511 )
512 .into_response();
513 }
514 if let Err(e) = tx.commit().await {
515 error!("Error committing reactivation: {:?}", e);
516 return (
517 StatusCode::INTERNAL_SERVER_ERROR,
518 Json(json!({"error": "InternalError"})),
519 )
520 .into_response();
521 }
522 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
523 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
524 )
525 .bind(account_id)
526 .fetch_optional(&state.db)
527 .await
528 .unwrap_or(None);
529 let secret_key_bytes = match key_row {
530 Some((key_bytes, encryption_version)) => {
531 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
532 Ok(k) => k,
533 Err(e) => {
534 error!("Error decrypting key for reactivated account: {:?}", e);
535 return (
536 StatusCode::INTERNAL_SERVER_ERROR,
537 Json(json!({"error": "InternalError"})),
538 )
539 .into_response();
540 }
541 }
542 }
543 None => {
544 error!("No signing key found for reactivated account");
545 return (
546 StatusCode::INTERNAL_SERVER_ERROR,
547 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
548 )
549 .into_response();
550 }
551 };
552 let access_meta =
553 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
554 Ok(m) => m,
555 Err(e) => {
556 error!("Error creating access token: {:?}", e);
557 return (
558 StatusCode::INTERNAL_SERVER_ERROR,
559 Json(json!({"error": "InternalError"})),
560 )
561 .into_response();
562 }
563 };
564 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
565 &did,
566 &secret_key_bytes,
567 ) {
568 Ok(m) => m,
569 Err(e) => {
570 error!("Error creating refresh token: {:?}", e);
571 return (
572 StatusCode::INTERNAL_SERVER_ERROR,
573 Json(json!({"error": "InternalError"})),
574 )
575 .into_response();
576 }
577 };
578 let session_result: Result<_, sqlx::Error> = sqlx::query(
579 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
580 )
581 .bind(&did)
582 .bind(&access_meta.jti)
583 .bind(&refresh_meta.jti)
584 .bind(access_meta.expires_at)
585 .bind(refresh_meta.expires_at)
586 .execute(&state.db)
587 .await;
588 if let Err(e) = session_result {
589 error!("Error creating session: {:?}", e);
590 return (
591 StatusCode::INTERNAL_SERVER_ERROR,
592 Json(json!({"error": "InternalError"})),
593 )
594 .into_response();
595 }
596 return (
597 StatusCode::OK,
598 Json(CreateAccountOutput {
599 handle: handle.clone(),
600 did,
601 access_jwt: Some(access_meta.token),
602 refresh_jwt: Some(refresh_meta.token),
603 verification_required: false,
604 verification_channel: "email".to_string(),
605 }),
606 )
607 .into_response();
608 } else {
609 return (
610 StatusCode::BAD_REQUEST,
611 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
612 )
613 .into_response();
614 }
615 }
616 }
617 let exists_result: Option<(i32,)> =
618 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
619 .bind(&handle)
620 .fetch_optional(&mut *tx)
621 .await
622 .unwrap_or(None);
623 if exists_result.is_some() {
624 return (
625 StatusCode::BAD_REQUEST,
626 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
627 )
628 .into_response();
629 }
630 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
631 .map(|v| v == "true" || v == "1")
632 .unwrap_or(false);
633 if invite_code_required
634 && input
635 .invite_code
636 .as_ref()
637 .map(|c| c.trim().is_empty())
638 .unwrap_or(true)
639 {
640 return (
641 StatusCode::BAD_REQUEST,
642 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
643 )
644 .into_response();
645 }
646 if let Some(code) = &input.invite_code
647 && !code.trim().is_empty()
648 {
649 let invite_query = sqlx::query!(
650 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
651 code
652 )
653 .fetch_optional(&mut *tx)
654 .await;
655 match invite_query {
656 Ok(Some(row)) => {
657 if row.available_uses <= 0 {
658 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
659 }
660 let update_invite = sqlx::query!(
661 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
662 code
663 )
664 .execute(&mut *tx)
665 .await;
666 if let Err(e) = update_invite {
667 error!("Error updating invite code: {:?}", e);
668 return (
669 StatusCode::INTERNAL_SERVER_ERROR,
670 Json(json!({"error": "InternalError"})),
671 )
672 .into_response();
673 }
674 }
675 Ok(None) => {
676 return (
677 StatusCode::BAD_REQUEST,
678 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
679 )
680 .into_response();
681 }
682 Err(e) => {
683 error!("Error checking invite code: {:?}", e);
684 return (
685 StatusCode::INTERNAL_SERVER_ERROR,
686 Json(json!({"error": "InternalError"})),
687 )
688 .into_response();
689 }
690 }
691 }
692 if let Err(e) = validate_password(&input.password) {
693 return (
694 StatusCode::BAD_REQUEST,
695 Json(json!({
696 "error": "InvalidPassword",
697 "message": e.to_string()
698 })),
699 )
700 .into_response();
701 }
702
703 let password_hash = match hash(&input.password, DEFAULT_COST) {
704 Ok(h) => h,
705 Err(e) => {
706 error!("Error hashing password: {:?}", e);
707 return (
708 StatusCode::INTERNAL_SERVER_ERROR,
709 Json(json!({"error": "InternalError"})),
710 )
711 .into_response();
712 }
713 };
714 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
715 .fetch_one(&mut *tx)
716 .await
717 .map(|c| c.unwrap_or(0) == 0)
718 .unwrap_or(false);
719 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
720 Some(chrono::Utc::now())
721 } else {
722 None
723 };
724 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
725 r#"INSERT INTO users (
726 handle, email, did, password_hash,
727 preferred_comms_channel,
728 discord_id, telegram_username, signal_number,
729 is_admin, deactivated_at, email_verified
730 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
731 )
732 .bind(&handle)
733 .bind(&email)
734 .bind(&did)
735 .bind(&password_hash)
736 .bind(verification_channel)
737 .bind(
738 input
739 .discord_id
740 .as_deref()
741 .map(|s| s.trim())
742 .filter(|s| !s.is_empty()),
743 )
744 .bind(
745 input
746 .telegram_username
747 .as_deref()
748 .map(|s| s.trim())
749 .filter(|s| !s.is_empty()),
750 )
751 .bind(
752 input
753 .signal_number
754 .as_deref()
755 .map(|s| s.trim())
756 .filter(|s| !s.is_empty()),
757 )
758 .bind(is_first_user)
759 .bind(deactivated_at)
760 .bind(false)
761 .fetch_one(&mut *tx)
762 .await;
763 let user_id = match user_insert {
764 Ok((id,)) => id,
765 Err(e) => {
766 if let Some(db_err) = e.as_database_error()
767 && db_err.code().as_deref() == Some("23505")
768 {
769 let constraint = db_err.constraint().unwrap_or("");
770 if constraint.contains("handle") || constraint.contains("users_handle") {
771 return (
772 StatusCode::BAD_REQUEST,
773 Json(json!({
774 "error": "HandleNotAvailable",
775 "message": "Handle already taken"
776 })),
777 )
778 .into_response();
779 } else if constraint.contains("email") || constraint.contains("users_email") {
780 return (
781 StatusCode::BAD_REQUEST,
782 Json(json!({
783 "error": "InvalidEmail",
784 "message": "Email already registered"
785 })),
786 )
787 .into_response();
788 } else if constraint.contains("did") || constraint.contains("users_did") {
789 return (
790 StatusCode::BAD_REQUEST,
791 Json(json!({
792 "error": "AccountAlreadyExists",
793 "message": "An account with this DID already exists"
794 })),
795 )
796 .into_response();
797 }
798 }
799 error!("Error inserting user: {:?}", e);
800 return (
801 StatusCode::INTERNAL_SERVER_ERROR,
802 Json(json!({"error": "InternalError"})),
803 )
804 .into_response();
805 }
806 };
807
808 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
809 Ok(enc) => enc,
810 Err(e) => {
811 error!("Error encrypting user key: {:?}", e);
812 return (
813 StatusCode::INTERNAL_SERVER_ERROR,
814 Json(json!({"error": "InternalError"})),
815 )
816 .into_response();
817 }
818 };
819 let key_insert = sqlx::query!(
820 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
821 user_id,
822 &encrypted_key_bytes[..],
823 crate::config::ENCRYPTION_VERSION
824 )
825 .execute(&mut *tx)
826 .await;
827 if let Err(e) = key_insert {
828 error!("Error inserting user key: {:?}", e);
829 return (
830 StatusCode::INTERNAL_SERVER_ERROR,
831 Json(json!({"error": "InternalError"})),
832 )
833 .into_response();
834 }
835 if let Some(key_id) = reserved_key_id {
836 let mark_used = sqlx::query!(
837 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
838 key_id
839 )
840 .execute(&mut *tx)
841 .await;
842 if let Err(e) = mark_used {
843 error!("Error marking reserved key as used: {:?}", e);
844 return (
845 StatusCode::INTERNAL_SERVER_ERROR,
846 Json(json!({"error": "InternalError"})),
847 )
848 .into_response();
849 }
850 }
851 let mst = Mst::new(Arc::new(state.block_store.clone()));
852 let mst_root = match mst.persist().await {
853 Ok(c) => c,
854 Err(e) => {
855 error!("Error persisting MST: {:?}", e);
856 return (
857 StatusCode::INTERNAL_SERVER_ERROR,
858 Json(json!({"error": "InternalError"})),
859 )
860 .into_response();
861 }
862 };
863 let rev = Tid::now(LimitedU32::MIN);
864 let (commit_bytes, _sig) =
865 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
866 Ok(result) => result,
867 Err(e) => {
868 error!("Error creating genesis commit: {:?}", e);
869 return (
870 StatusCode::INTERNAL_SERVER_ERROR,
871 Json(json!({"error": "InternalError"})),
872 )
873 .into_response();
874 }
875 };
876 let commit_cid = match state.block_store.put(&commit_bytes).await {
877 Ok(c) => c,
878 Err(e) => {
879 error!("Error saving genesis commit: {:?}", e);
880 return (
881 StatusCode::INTERNAL_SERVER_ERROR,
882 Json(json!({"error": "InternalError"})),
883 )
884 .into_response();
885 }
886 };
887 let commit_cid_str = commit_cid.to_string();
888 let repo_insert = sqlx::query!(
889 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
890 user_id,
891 commit_cid_str
892 )
893 .execute(&mut *tx)
894 .await;
895 if let Err(e) = repo_insert {
896 error!("Error initializing repo: {:?}", e);
897 return (
898 StatusCode::INTERNAL_SERVER_ERROR,
899 Json(json!({"error": "InternalError"})),
900 )
901 .into_response();
902 }
903 if let Some(code) = &input.invite_code
904 && !code.trim().is_empty()
905 {
906 let use_insert = sqlx::query!(
907 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
908 code,
909 user_id
910 )
911 .execute(&mut *tx)
912 .await;
913 if let Err(e) = use_insert {
914 error!("Error recording invite usage: {:?}", e);
915 return (
916 StatusCode::INTERNAL_SERVER_ERROR,
917 Json(json!({"error": "InternalError"})),
918 )
919 .into_response();
920 }
921 }
922 if let Err(e) = tx.commit().await {
923 error!("Error committing transaction: {:?}", e);
924 return (
925 StatusCode::INTERNAL_SERVER_ERROR,
926 Json(json!({"error": "InternalError"})),
927 )
928 .into_response();
929 }
930 if !is_migration && !is_did_web_byod {
931 if let Err(e) =
932 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
933 {
934 warn!("Failed to sequence identity event for {}: {}", did, e);
935 }
936 if let Err(e) =
937 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
938 {
939 warn!("Failed to sequence account event for {}: {}", did, e);
940 }
941 let profile_record = json!({
942 "$type": "app.bsky.actor.profile",
943 "displayName": input.handle
944 });
945 if let Err(e) = crate::api::repo::record::create_record_internal(
946 &state,
947 &did,
948 "app.bsky.actor.profile",
949 "self",
950 &profile_record,
951 )
952 .await
953 {
954 warn!("Failed to create default profile for {}: {}", did, e);
955 }
956 }
957 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
958 if !is_migration {
959 if let Some(ref recipient) = verification_recipient {
960 let verification_token = crate::auth::verification_token::generate_signup_token(
961 &did,
962 verification_channel,
963 recipient,
964 );
965 let formatted_token =
966 crate::auth::verification_token::format_token_for_display(&verification_token);
967 if let Err(e) = crate::comms::enqueue_signup_verification(
968 &state.db,
969 user_id,
970 verification_channel,
971 recipient,
972 &formatted_token,
973 None,
974 )
975 .await
976 {
977 warn!(
978 "Failed to enqueue signup verification notification: {:?}",
979 e
980 );
981 }
982 }
983 } else if let Some(ref user_email) = email {
984 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
985 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
986 if let Err(e) = crate::comms::enqueue_migration_verification(
987 &state.db,
988 user_id,
989 user_email,
990 &formatted_token,
991 &hostname,
992 )
993 .await
994 {
995 warn!("Failed to enqueue migration verification email: {:?}", e);
996 }
997 }
998
999 let (access_jwt, refresh_jwt) = if is_migration {
1000 let access_meta =
1001 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
1002 Ok(m) => m,
1003 Err(e) => {
1004 error!("Error creating access token for migration: {:?}", e);
1005 return (
1006 StatusCode::INTERNAL_SERVER_ERROR,
1007 Json(json!({"error": "InternalError"})),
1008 )
1009 .into_response();
1010 }
1011 };
1012 let refresh_meta =
1013 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1014 Ok(m) => m,
1015 Err(e) => {
1016 error!("Error creating refresh token for migration: {:?}", e);
1017 return (
1018 StatusCode::INTERNAL_SERVER_ERROR,
1019 Json(json!({"error": "InternalError"})),
1020 )
1021 .into_response();
1022 }
1023 };
1024 if let Err(e) = sqlx::query!(
1025 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1026 did,
1027 access_meta.jti,
1028 refresh_meta.jti,
1029 access_meta.expires_at,
1030 refresh_meta.expires_at
1031 )
1032 .execute(&state.db)
1033 .await
1034 {
1035 error!("Error creating session for migration: {:?}", e);
1036 return (
1037 StatusCode::INTERNAL_SERVER_ERROR,
1038 Json(json!({"error": "InternalError"})),
1039 )
1040 .into_response();
1041 }
1042 (Some(access_meta.token), Some(refresh_meta.token))
1043 } else {
1044 (None, None)
1045 };
1046
1047 (
1048 StatusCode::OK,
1049 Json(CreateAccountOutput {
1050 handle: handle.clone(),
1051 did,
1052 access_jwt,
1053 refresh_jwt,
1054 verification_required: !is_migration,
1055 verification_channel: verification_channel.to_string(),
1056 }),
1057 )
1058 .into_response()
1059}