this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub access_jwt: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub refresh_jwt: Option<String>,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 info!("create_account called");
73 let client_ip = extract_client_ip(&headers);
74 if !state
75 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
76 .await
77 {
78 warn!(ip = %client_ip, "Account creation rate limit exceeded");
79 return (
80 StatusCode::TOO_MANY_REQUESTS,
81 Json(json!({
82 "error": "RateLimitExceeded",
83 "message": "Too many account creation attempts. Please try again later."
84 })),
85 )
86 .into_response();
87 }
88
89 let migration_auth = if let Some(token) =
90 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
91 {
92 if is_service_token(&token) {
93 let verifier = ServiceTokenVerifier::new();
94 match verifier
95 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
96 .await
97 {
98 Ok(claims) => {
99 debug!("Service token verified for migration: iss={}", claims.iss);
100 Some(claims.iss)
101 }
102 Err(e) => {
103 error!("Service token verification failed: {:?}", e);
104 return (
105 StatusCode::UNAUTHORIZED,
106 Json(json!({
107 "error": "AuthenticationFailed",
108 "message": format!("Service token verification failed: {}", e)
109 })),
110 )
111 .into_response();
112 }
113 }
114 } else {
115 None
116 }
117 } else {
118 None
119 };
120
121 let is_did_web_byod = migration_auth.is_some()
122 && input
123 .did
124 .as_ref()
125 .map(|d| d.starts_with("did:web:"))
126 .unwrap_or(false);
127
128 let is_migration = migration_auth.is_some()
129 && input
130 .did
131 .as_ref()
132 .map(|d| d.starts_with("did:plc:"))
133 .unwrap_or(false);
134
135 if (is_migration || is_did_web_byod)
136 && let (Some(provided_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
137 {
138 if provided_did != auth_did {
139 return (
140 StatusCode::FORBIDDEN,
141 Json(json!({
142 "error": "AuthorizationError",
143 "message": format!("Service token issuer {} does not match DID {}", auth_did, provided_did)
144 })),
145 )
146 .into_response();
147 }
148 if is_did_web_byod {
149 info!(did = %provided_did, "Processing did:web BYOD account creation");
150 } else {
151 info!(did = %provided_did, "Processing account migration");
152 }
153 }
154
155 let hostname_for_validation =
156 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
157 let pds_suffix = format!(".{}", hostname_for_validation);
158
159 let validated_short_handle = if !input.handle.contains('.')
160 || input.handle.ends_with(&pds_suffix)
161 {
162 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
163 input
164 .handle
165 .strip_suffix(&pds_suffix)
166 .unwrap_or(&input.handle)
167 } else {
168 &input.handle
169 };
170 match crate::api::validation::validate_short_handle(handle_to_validate) {
171 Ok(h) => h,
172 Err(e) => {
173 return (
174 StatusCode::BAD_REQUEST,
175 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
176 )
177 .into_response();
178 }
179 }
180 } else {
181 if input.handle.contains(' ') || input.handle.contains('\t') {
182 return (
183 StatusCode::BAD_REQUEST,
184 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
185 )
186 .into_response();
187 }
188 for c in input.handle.chars() {
189 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
190 return (
191 StatusCode::BAD_REQUEST,
192 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
193 )
194 .into_response();
195 }
196 }
197 let handle_lower = input.handle.to_lowercase();
198 if crate::moderation::has_explicit_slur(&handle_lower) {
199 return (
200 StatusCode::BAD_REQUEST,
201 Json(json!({"error": "InvalidHandle", "message": "Inappropriate language in handle"})),
202 )
203 .into_response();
204 }
205 handle_lower
206 };
207 let email: Option<String> = input
208 .email
209 .as_ref()
210 .map(|e| e.trim().to_string())
211 .filter(|e| !e.is_empty());
212 if let Some(ref email) = email
213 && !crate::api::validation::is_valid_email(email)
214 {
215 return (
216 StatusCode::BAD_REQUEST,
217 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
218 )
219 .into_response();
220 }
221 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
222 let valid_channels = ["email", "discord", "telegram", "signal"];
223 if !valid_channels.contains(&verification_channel) && !is_migration {
224 return (
225 StatusCode::BAD_REQUEST,
226 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
227 )
228 .into_response();
229 }
230 let verification_recipient = if is_migration {
231 None
232 } else {
233 Some(match verification_channel {
234 "email" => match &input.email {
235 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
236 _ => return (
237 StatusCode::BAD_REQUEST,
238 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
239 ).into_response(),
240 },
241 "discord" => match &input.discord_id {
242 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
243 _ => return (
244 StatusCode::BAD_REQUEST,
245 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
246 ).into_response(),
247 },
248 "telegram" => match &input.telegram_username {
249 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
250 _ => return (
251 StatusCode::BAD_REQUEST,
252 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
253 ).into_response(),
254 },
255 "signal" => match &input.signal_number {
256 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
257 _ => return (
258 StatusCode::BAD_REQUEST,
259 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
260 ).into_response(),
261 },
262 _ => return (
263 StatusCode::BAD_REQUEST,
264 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
265 ).into_response(),
266 })
267 };
268 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
269 let pds_endpoint = format!("https://{}", hostname);
270 let suffix = format!(".{}", hostname);
271 let handle = if input.handle.ends_with(&suffix) {
272 format!("{}.{}", validated_short_handle, hostname)
273 } else if input.handle.contains('.') {
274 validated_short_handle.clone()
275 } else {
276 format!("{}.{}", validated_short_handle, hostname)
277 };
278 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
279 if let Some(signing_key_did) = &input.signing_key {
280 let reserved = sqlx::query!(
281 r#"
282 SELECT id, private_key_bytes
283 FROM reserved_signing_keys
284 WHERE public_key_did_key = $1
285 AND used_at IS NULL
286 AND expires_at > NOW()
287 FOR UPDATE
288 "#,
289 signing_key_did
290 )
291 .fetch_optional(&state.db)
292 .await;
293 match reserved {
294 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
295 Ok(None) => {
296 return (
297 StatusCode::BAD_REQUEST,
298 Json(json!({
299 "error": "InvalidSigningKey",
300 "message": "Signing key not found, already used, or expired"
301 })),
302 )
303 .into_response();
304 }
305 Err(e) => {
306 error!("Error looking up reserved signing key: {:?}", e);
307 return (
308 StatusCode::INTERNAL_SERVER_ERROR,
309 Json(json!({"error": "InternalError"})),
310 )
311 .into_response();
312 }
313 }
314 } else {
315 let secret_key = SecretKey::random(&mut OsRng);
316 (secret_key.to_bytes().to_vec(), None)
317 };
318 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
319 Ok(k) => k,
320 Err(e) => {
321 error!("Error creating signing key: {:?}", e);
322 return (
323 StatusCode::INTERNAL_SERVER_ERROR,
324 Json(json!({"error": "InternalError"})),
325 )
326 .into_response();
327 }
328 };
329 let did_type = input.did_type.as_deref().unwrap_or("plc");
330 let did = match did_type {
331 "web" => {
332 let subdomain_host = format!("{}.{}", input.handle, hostname);
333 let encoded_subdomain = subdomain_host.replace(':', "%3A");
334 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
335 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
336 self_hosted_did
337 }
338 "web-external" => {
339 let d = match &input.did {
340 Some(d) if !d.trim().is_empty() => d,
341 _ => {
342 return (
343 StatusCode::BAD_REQUEST,
344 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
345 )
346 .into_response();
347 }
348 };
349 if !d.starts_with("did:web:") {
350 return (
351 StatusCode::BAD_REQUEST,
352 Json(
353 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
354 ),
355 )
356 .into_response();
357 }
358 if !is_did_web_byod
359 && let Err(e) =
360 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
361 {
362 return (
363 StatusCode::BAD_REQUEST,
364 Json(json!({"error": "InvalidDid", "message": e})),
365 )
366 .into_response();
367 }
368 info!(did = %d, "Creating external did:web account");
369 d.clone()
370 }
371 _ => {
372 if let Some(d) = &input.did {
373 if d.starts_with("did:plc:") && is_migration {
374 info!(did = %d, "Migration with existing did:plc");
375 d.clone()
376 } else if d.starts_with("did:web:") {
377 if !is_did_web_byod
378 && let Err(e) = verify_did_web(
379 d,
380 &hostname,
381 &input.handle,
382 input.signing_key.as_deref(),
383 )
384 .await
385 {
386 return (
387 StatusCode::BAD_REQUEST,
388 Json(json!({"error": "InvalidDid", "message": e})),
389 )
390 .into_response();
391 }
392 d.clone()
393 } else if !d.trim().is_empty() {
394 return (
395 StatusCode::BAD_REQUEST,
396 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
397 )
398 .into_response();
399 } else {
400 let rotation_key = std::env::var("PLC_ROTATION_KEY")
401 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
402 let genesis_result = match create_genesis_operation(
403 &signing_key,
404 &rotation_key,
405 &handle,
406 &pds_endpoint,
407 ) {
408 Ok(r) => r,
409 Err(e) => {
410 error!("Error creating PLC genesis operation: {:?}", e);
411 return (
412 StatusCode::INTERNAL_SERVER_ERROR,
413 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
414 )
415 .into_response();
416 }
417 };
418 let plc_client = PlcClient::new(None);
419 if let Err(e) = plc_client
420 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
421 .await
422 {
423 error!("Failed to submit PLC genesis operation: {:?}", e);
424 return (
425 StatusCode::BAD_GATEWAY,
426 Json(json!({
427 "error": "UpstreamError",
428 "message": format!("Failed to register DID with PLC directory: {}", e)
429 })),
430 )
431 .into_response();
432 }
433 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
434 genesis_result.did
435 }
436 } else {
437 let rotation_key = std::env::var("PLC_ROTATION_KEY")
438 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
439 let genesis_result = match create_genesis_operation(
440 &signing_key,
441 &rotation_key,
442 &handle,
443 &pds_endpoint,
444 ) {
445 Ok(r) => r,
446 Err(e) => {
447 error!("Error creating PLC genesis operation: {:?}", e);
448 return (
449 StatusCode::INTERNAL_SERVER_ERROR,
450 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
451 )
452 .into_response();
453 }
454 };
455 let plc_client = PlcClient::new(None);
456 if let Err(e) = plc_client
457 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
458 .await
459 {
460 error!("Failed to submit PLC genesis operation: {:?}", e);
461 return (
462 StatusCode::BAD_GATEWAY,
463 Json(json!({
464 "error": "UpstreamError",
465 "message": format!("Failed to register DID with PLC directory: {}", e)
466 })),
467 )
468 .into_response();
469 }
470 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
471 genesis_result.did
472 }
473 }
474 };
475 let mut tx = match state.db.begin().await {
476 Ok(tx) => tx,
477 Err(e) => {
478 error!("Error starting transaction: {:?}", e);
479 return (
480 StatusCode::INTERNAL_SERVER_ERROR,
481 Json(json!({"error": "InternalError"})),
482 )
483 .into_response();
484 }
485 };
486 if is_migration {
487 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
488 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
489 .bind(&did)
490 .fetch_optional(&mut *tx)
491 .await
492 .unwrap_or(None);
493 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
494 if deactivated_at.is_some() {
495 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
496 let update_result: Result<_, sqlx::Error> =
497 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
498 .bind(&handle)
499 .bind(account_id)
500 .execute(&mut *tx)
501 .await;
502 if let Err(e) = update_result {
503 if let Some(db_err) = e.as_database_error()
504 && db_err
505 .constraint()
506 .map(|c| c.contains("handle"))
507 .unwrap_or(false)
508 {
509 return (
510 StatusCode::BAD_REQUEST,
511 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
512 )
513 .into_response();
514 }
515 error!("Error reactivating account: {:?}", e);
516 return (
517 StatusCode::INTERNAL_SERVER_ERROR,
518 Json(json!({"error": "InternalError"})),
519 )
520 .into_response();
521 }
522 if let Err(e) = tx.commit().await {
523 error!("Error committing reactivation: {:?}", e);
524 return (
525 StatusCode::INTERNAL_SERVER_ERROR,
526 Json(json!({"error": "InternalError"})),
527 )
528 .into_response();
529 }
530 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
531 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
532 )
533 .bind(account_id)
534 .fetch_optional(&state.db)
535 .await
536 .unwrap_or(None);
537 let secret_key_bytes = match key_row {
538 Some((key_bytes, encryption_version)) => {
539 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
540 Ok(k) => k,
541 Err(e) => {
542 error!("Error decrypting key for reactivated account: {:?}", e);
543 return (
544 StatusCode::INTERNAL_SERVER_ERROR,
545 Json(json!({"error": "InternalError"})),
546 )
547 .into_response();
548 }
549 }
550 }
551 None => {
552 error!("No signing key found for reactivated account");
553 return (
554 StatusCode::INTERNAL_SERVER_ERROR,
555 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
556 )
557 .into_response();
558 }
559 };
560 let access_meta =
561 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
562 Ok(m) => m,
563 Err(e) => {
564 error!("Error creating access token: {:?}", e);
565 return (
566 StatusCode::INTERNAL_SERVER_ERROR,
567 Json(json!({"error": "InternalError"})),
568 )
569 .into_response();
570 }
571 };
572 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
573 &did,
574 &secret_key_bytes,
575 ) {
576 Ok(m) => m,
577 Err(e) => {
578 error!("Error creating refresh token: {:?}", e);
579 return (
580 StatusCode::INTERNAL_SERVER_ERROR,
581 Json(json!({"error": "InternalError"})),
582 )
583 .into_response();
584 }
585 };
586 let session_result: Result<_, sqlx::Error> = sqlx::query(
587 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
588 )
589 .bind(&did)
590 .bind(&access_meta.jti)
591 .bind(&refresh_meta.jti)
592 .bind(access_meta.expires_at)
593 .bind(refresh_meta.expires_at)
594 .execute(&state.db)
595 .await;
596 if let Err(e) = session_result {
597 error!("Error creating session: {:?}", e);
598 return (
599 StatusCode::INTERNAL_SERVER_ERROR,
600 Json(json!({"error": "InternalError"})),
601 )
602 .into_response();
603 }
604 return (
605 StatusCode::OK,
606 Json(CreateAccountOutput {
607 handle: handle.clone(),
608 did,
609 access_jwt: Some(access_meta.token),
610 refresh_jwt: Some(refresh_meta.token),
611 verification_required: false,
612 verification_channel: "email".to_string(),
613 }),
614 )
615 .into_response();
616 } else {
617 return (
618 StatusCode::BAD_REQUEST,
619 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
620 )
621 .into_response();
622 }
623 }
624 }
625 let exists_result: Option<(i32,)> =
626 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
627 .bind(&handle)
628 .fetch_optional(&mut *tx)
629 .await
630 .unwrap_or(None);
631 if exists_result.is_some() {
632 return (
633 StatusCode::BAD_REQUEST,
634 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
635 )
636 .into_response();
637 }
638 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
639 .map(|v| v == "true" || v == "1")
640 .unwrap_or(false);
641 if invite_code_required
642 && input
643 .invite_code
644 .as_ref()
645 .map(|c| c.trim().is_empty())
646 .unwrap_or(true)
647 {
648 return (
649 StatusCode::BAD_REQUEST,
650 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
651 )
652 .into_response();
653 }
654 if let Some(code) = &input.invite_code
655 && !code.trim().is_empty()
656 {
657 let invite_query = sqlx::query!(
658 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
659 code
660 )
661 .fetch_optional(&mut *tx)
662 .await;
663 match invite_query {
664 Ok(Some(row)) => {
665 if row.available_uses <= 0 {
666 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
667 }
668 let update_invite = sqlx::query!(
669 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
670 code
671 )
672 .execute(&mut *tx)
673 .await;
674 if let Err(e) = update_invite {
675 error!("Error updating invite code: {:?}", e);
676 return (
677 StatusCode::INTERNAL_SERVER_ERROR,
678 Json(json!({"error": "InternalError"})),
679 )
680 .into_response();
681 }
682 }
683 Ok(None) => {
684 return (
685 StatusCode::BAD_REQUEST,
686 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
687 )
688 .into_response();
689 }
690 Err(e) => {
691 error!("Error checking invite code: {:?}", e);
692 return (
693 StatusCode::INTERNAL_SERVER_ERROR,
694 Json(json!({"error": "InternalError"})),
695 )
696 .into_response();
697 }
698 }
699 }
700 if let Err(e) = validate_password(&input.password) {
701 return (
702 StatusCode::BAD_REQUEST,
703 Json(json!({
704 "error": "InvalidPassword",
705 "message": e.to_string()
706 })),
707 )
708 .into_response();
709 }
710
711 let password_hash = match hash(&input.password, DEFAULT_COST) {
712 Ok(h) => h,
713 Err(e) => {
714 error!("Error hashing password: {:?}", e);
715 return (
716 StatusCode::INTERNAL_SERVER_ERROR,
717 Json(json!({"error": "InternalError"})),
718 )
719 .into_response();
720 }
721 };
722 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
723 .fetch_one(&mut *tx)
724 .await
725 .map(|c| c.unwrap_or(0) == 0)
726 .unwrap_or(false);
727 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration || is_did_web_byod {
728 Some(chrono::Utc::now())
729 } else {
730 None
731 };
732 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
733 r#"INSERT INTO users (
734 handle, email, did, password_hash,
735 preferred_comms_channel,
736 discord_id, telegram_username, signal_number,
737 is_admin, deactivated_at, email_verified
738 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
739 )
740 .bind(&handle)
741 .bind(&email)
742 .bind(&did)
743 .bind(&password_hash)
744 .bind(verification_channel)
745 .bind(
746 input
747 .discord_id
748 .as_deref()
749 .map(|s| s.trim())
750 .filter(|s| !s.is_empty()),
751 )
752 .bind(
753 input
754 .telegram_username
755 .as_deref()
756 .map(|s| s.trim())
757 .filter(|s| !s.is_empty()),
758 )
759 .bind(
760 input
761 .signal_number
762 .as_deref()
763 .map(|s| s.trim())
764 .filter(|s| !s.is_empty()),
765 )
766 .bind(is_first_user)
767 .bind(deactivated_at)
768 .bind(false)
769 .fetch_one(&mut *tx)
770 .await;
771 let user_id = match user_insert {
772 Ok((id,)) => id,
773 Err(e) => {
774 if let Some(db_err) = e.as_database_error()
775 && db_err.code().as_deref() == Some("23505")
776 {
777 let constraint = db_err.constraint().unwrap_or("");
778 if constraint.contains("handle") || constraint.contains("users_handle") {
779 return (
780 StatusCode::BAD_REQUEST,
781 Json(json!({
782 "error": "HandleNotAvailable",
783 "message": "Handle already taken"
784 })),
785 )
786 .into_response();
787 } else if constraint.contains("email") || constraint.contains("users_email") {
788 return (
789 StatusCode::BAD_REQUEST,
790 Json(json!({
791 "error": "InvalidEmail",
792 "message": "Email already registered"
793 })),
794 )
795 .into_response();
796 } else if constraint.contains("did") || constraint.contains("users_did") {
797 return (
798 StatusCode::BAD_REQUEST,
799 Json(json!({
800 "error": "AccountAlreadyExists",
801 "message": "An account with this DID already exists"
802 })),
803 )
804 .into_response();
805 }
806 }
807 error!("Error inserting user: {:?}", e);
808 return (
809 StatusCode::INTERNAL_SERVER_ERROR,
810 Json(json!({"error": "InternalError"})),
811 )
812 .into_response();
813 }
814 };
815
816 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
817 Ok(enc) => enc,
818 Err(e) => {
819 error!("Error encrypting user key: {:?}", e);
820 return (
821 StatusCode::INTERNAL_SERVER_ERROR,
822 Json(json!({"error": "InternalError"})),
823 )
824 .into_response();
825 }
826 };
827 let key_insert = sqlx::query!(
828 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
829 user_id,
830 &encrypted_key_bytes[..],
831 crate::config::ENCRYPTION_VERSION
832 )
833 .execute(&mut *tx)
834 .await;
835 if let Err(e) = key_insert {
836 error!("Error inserting user key: {:?}", e);
837 return (
838 StatusCode::INTERNAL_SERVER_ERROR,
839 Json(json!({"error": "InternalError"})),
840 )
841 .into_response();
842 }
843 if let Some(key_id) = reserved_key_id {
844 let mark_used = sqlx::query!(
845 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
846 key_id
847 )
848 .execute(&mut *tx)
849 .await;
850 if let Err(e) = mark_used {
851 error!("Error marking reserved key as used: {:?}", e);
852 return (
853 StatusCode::INTERNAL_SERVER_ERROR,
854 Json(json!({"error": "InternalError"})),
855 )
856 .into_response();
857 }
858 }
859 let mst = Mst::new(Arc::new(state.block_store.clone()));
860 let mst_root = match mst.persist().await {
861 Ok(c) => c,
862 Err(e) => {
863 error!("Error persisting MST: {:?}", e);
864 return (
865 StatusCode::INTERNAL_SERVER_ERROR,
866 Json(json!({"error": "InternalError"})),
867 )
868 .into_response();
869 }
870 };
871 let rev = Tid::now(LimitedU32::MIN);
872 let (commit_bytes, _sig) =
873 match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
874 Ok(result) => result,
875 Err(e) => {
876 error!("Error creating genesis commit: {:?}", e);
877 return (
878 StatusCode::INTERNAL_SERVER_ERROR,
879 Json(json!({"error": "InternalError"})),
880 )
881 .into_response();
882 }
883 };
884 let commit_cid = match state.block_store.put(&commit_bytes).await {
885 Ok(c) => c,
886 Err(e) => {
887 error!("Error saving genesis commit: {:?}", e);
888 return (
889 StatusCode::INTERNAL_SERVER_ERROR,
890 Json(json!({"error": "InternalError"})),
891 )
892 .into_response();
893 }
894 };
895 let commit_cid_str = commit_cid.to_string();
896 let repo_insert = sqlx::query!(
897 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
898 user_id,
899 commit_cid_str
900 )
901 .execute(&mut *tx)
902 .await;
903 if let Err(e) = repo_insert {
904 error!("Error initializing repo: {:?}", e);
905 return (
906 StatusCode::INTERNAL_SERVER_ERROR,
907 Json(json!({"error": "InternalError"})),
908 )
909 .into_response();
910 }
911 if let Some(code) = &input.invite_code
912 && !code.trim().is_empty()
913 {
914 let use_insert = sqlx::query!(
915 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
916 code,
917 user_id
918 )
919 .execute(&mut *tx)
920 .await;
921 if let Err(e) = use_insert {
922 error!("Error recording invite usage: {:?}", e);
923 return (
924 StatusCode::INTERNAL_SERVER_ERROR,
925 Json(json!({"error": "InternalError"})),
926 )
927 .into_response();
928 }
929 }
930 if let Err(e) = tx.commit().await {
931 error!("Error committing transaction: {:?}", e);
932 return (
933 StatusCode::INTERNAL_SERVER_ERROR,
934 Json(json!({"error": "InternalError"})),
935 )
936 .into_response();
937 }
938 if !is_migration && !is_did_web_byod {
939 if let Err(e) =
940 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
941 {
942 warn!("Failed to sequence identity event for {}: {}", did, e);
943 }
944 if let Err(e) =
945 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
946 {
947 warn!("Failed to sequence account event for {}: {}", did, e);
948 }
949 let profile_record = json!({
950 "$type": "app.bsky.actor.profile",
951 "displayName": input.handle
952 });
953 if let Err(e) = crate::api::repo::record::create_record_internal(
954 &state,
955 &did,
956 "app.bsky.actor.profile",
957 "self",
958 &profile_record,
959 )
960 .await
961 {
962 warn!("Failed to create default profile for {}: {}", did, e);
963 }
964 }
965 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
966 if !is_migration {
967 if let Some(ref recipient) = verification_recipient {
968 let verification_token = crate::auth::verification_token::generate_signup_token(
969 &did,
970 verification_channel,
971 recipient,
972 );
973 let formatted_token =
974 crate::auth::verification_token::format_token_for_display(&verification_token);
975 if let Err(e) = crate::comms::enqueue_signup_verification(
976 &state.db,
977 user_id,
978 verification_channel,
979 recipient,
980 &formatted_token,
981 None,
982 )
983 .await
984 {
985 warn!(
986 "Failed to enqueue signup verification notification: {:?}",
987 e
988 );
989 }
990 }
991 } else if let Some(ref user_email) = email {
992 let token = crate::auth::verification_token::generate_migration_token(&did, user_email);
993 let formatted_token = crate::auth::verification_token::format_token_for_display(&token);
994 if let Err(e) = crate::comms::enqueue_migration_verification(
995 &state.db,
996 user_id,
997 user_email,
998 &formatted_token,
999 &hostname,
1000 )
1001 .await
1002 {
1003 warn!("Failed to enqueue migration verification email: {:?}", e);
1004 }
1005 }
1006
1007 let (access_jwt, refresh_jwt) = if is_migration {
1008 let access_meta =
1009 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
1010 Ok(m) => m,
1011 Err(e) => {
1012 error!("Error creating access token for migration: {:?}", e);
1013 return (
1014 StatusCode::INTERNAL_SERVER_ERROR,
1015 Json(json!({"error": "InternalError"})),
1016 )
1017 .into_response();
1018 }
1019 };
1020 let refresh_meta =
1021 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1022 Ok(m) => m,
1023 Err(e) => {
1024 error!("Error creating refresh token for migration: {:?}", e);
1025 return (
1026 StatusCode::INTERNAL_SERVER_ERROR,
1027 Json(json!({"error": "InternalError"})),
1028 )
1029 .into_response();
1030 }
1031 };
1032 if let Err(e) = sqlx::query!(
1033 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1034 did,
1035 access_meta.jti,
1036 refresh_meta.jti,
1037 access_meta.expires_at,
1038 refresh_meta.expires_at
1039 )
1040 .execute(&state.db)
1041 .await
1042 {
1043 error!("Error creating session for migration: {:?}", e);
1044 return (
1045 StatusCode::INTERNAL_SERVER_ERROR,
1046 Json(json!({"error": "InternalError"})),
1047 )
1048 .into_response();
1049 }
1050 (Some(access_meta.token), Some(refresh_meta.token))
1051 } else {
1052 (None, None)
1053 };
1054
1055 (
1056 StatusCode::OK,
1057 Json(CreateAccountOutput {
1058 handle: handle.clone(),
1059 did,
1060 access_jwt,
1061 refresh_jwt,
1062 verification_required: !is_migration,
1063 verification_channel: verification_channel.to_string(),
1064 }),
1065 )
1066 .into_response()
1067}