this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub access_jwt: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub refresh_jwt: Option<String>,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 info!("create_account called");
73 let client_ip = extract_client_ip(&headers);
74 if !state
75 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
76 .await
77 {
78 warn!(ip = %client_ip, "Account creation rate limit exceeded");
79 return (
80 StatusCode::TOO_MANY_REQUESTS,
81 Json(json!({
82 "error": "RateLimitExceeded",
83 "message": "Too many account creation attempts. Please try again later."
84 })),
85 )
86 .into_response();
87 }
88
89 let migration_auth = if let Some(token) =
90 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
91 {
92 if is_service_token(&token) {
93 let verifier = ServiceTokenVerifier::new();
94 match verifier
95 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
96 .await
97 {
98 Ok(claims) => {
99 debug!("Service token verified for migration: iss={}", claims.iss);
100 Some(claims.iss)
101 }
102 Err(e) => {
103 error!("Service token verification failed: {:?}", e);
104 return (
105 StatusCode::UNAUTHORIZED,
106 Json(json!({
107 "error": "AuthenticationFailed",
108 "message": format!("Service token verification failed: {}", e)
109 })),
110 )
111 .into_response();
112 }
113 }
114 } else {
115 None
116 }
117 } else {
118 None
119 };
120
121 let is_migration = migration_auth.is_some()
122 && input
123 .did
124 .as_ref()
125 .map(|d| d.starts_with("did:plc:"))
126 .unwrap_or(false);
127
128 if is_migration {
129 if let (Some(migration_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
130 {
131 if migration_did != auth_did {
132 return (
133 StatusCode::FORBIDDEN,
134 Json(json!({
135 "error": "AuthorizationError",
136 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did)
137 })),
138 )
139 .into_response();
140 }
141 info!(did = %migration_did, "Processing account migration");
142 }
143 }
144
145 let hostname_for_validation =
146 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
147 let pds_suffix = format!(".{}", hostname_for_validation);
148
149 let validated_short_handle = if !input.handle.contains('.')
150 || input.handle.ends_with(&pds_suffix)
151 {
152 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
153 input
154 .handle
155 .strip_suffix(&pds_suffix)
156 .unwrap_or(&input.handle)
157 } else {
158 &input.handle
159 };
160 match crate::api::validation::validate_short_handle(handle_to_validate) {
161 Ok(h) => h,
162 Err(e) => {
163 return (
164 StatusCode::BAD_REQUEST,
165 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
166 )
167 .into_response();
168 }
169 }
170 } else {
171 if input.handle.contains(' ') || input.handle.contains('\t') {
172 return (
173 StatusCode::BAD_REQUEST,
174 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
175 )
176 .into_response();
177 }
178 for c in input.handle.chars() {
179 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
180 return (
181 StatusCode::BAD_REQUEST,
182 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
183 )
184 .into_response();
185 }
186 }
187 input.handle.to_lowercase()
188 };
189 let email: Option<String> = input
190 .email
191 .as_ref()
192 .map(|e| e.trim().to_string())
193 .filter(|e| !e.is_empty());
194 if let Some(ref email) = email
195 && !crate::api::validation::is_valid_email(email)
196 {
197 return (
198 StatusCode::BAD_REQUEST,
199 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
200 )
201 .into_response();
202 }
203 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
204 let valid_channels = ["email", "discord", "telegram", "signal"];
205 if !valid_channels.contains(&verification_channel) && !is_migration {
206 return (
207 StatusCode::BAD_REQUEST,
208 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
209 )
210 .into_response();
211 }
212 let verification_recipient = if is_migration {
213 None
214 } else {
215 Some(match verification_channel {
216 "email" => match &input.email {
217 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
218 _ => return (
219 StatusCode::BAD_REQUEST,
220 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
221 ).into_response(),
222 },
223 "discord" => match &input.discord_id {
224 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
225 _ => return (
226 StatusCode::BAD_REQUEST,
227 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
228 ).into_response(),
229 },
230 "telegram" => match &input.telegram_username {
231 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
232 _ => return (
233 StatusCode::BAD_REQUEST,
234 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
235 ).into_response(),
236 },
237 "signal" => match &input.signal_number {
238 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
239 _ => return (
240 StatusCode::BAD_REQUEST,
241 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
242 ).into_response(),
243 },
244 _ => return (
245 StatusCode::BAD_REQUEST,
246 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
247 ).into_response(),
248 })
249 };
250 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
251 let pds_endpoint = format!("https://{}", hostname);
252 let suffix = format!(".{}", hostname);
253 let handle = if input.handle.ends_with(&suffix) {
254 format!("{}.{}", validated_short_handle, hostname)
255 } else if input.handle.contains('.') {
256 validated_short_handle.clone()
257 } else {
258 format!("{}.{}", validated_short_handle, hostname)
259 };
260 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
261 if let Some(signing_key_did) = &input.signing_key {
262 let reserved = sqlx::query!(
263 r#"
264 SELECT id, private_key_bytes
265 FROM reserved_signing_keys
266 WHERE public_key_did_key = $1
267 AND used_at IS NULL
268 AND expires_at > NOW()
269 FOR UPDATE
270 "#,
271 signing_key_did
272 )
273 .fetch_optional(&state.db)
274 .await;
275 match reserved {
276 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
277 Ok(None) => {
278 return (
279 StatusCode::BAD_REQUEST,
280 Json(json!({
281 "error": "InvalidSigningKey",
282 "message": "Signing key not found, already used, or expired"
283 })),
284 )
285 .into_response();
286 }
287 Err(e) => {
288 error!("Error looking up reserved signing key: {:?}", e);
289 return (
290 StatusCode::INTERNAL_SERVER_ERROR,
291 Json(json!({"error": "InternalError"})),
292 )
293 .into_response();
294 }
295 }
296 } else {
297 let secret_key = SecretKey::random(&mut OsRng);
298 (secret_key.to_bytes().to_vec(), None)
299 };
300 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
301 Ok(k) => k,
302 Err(e) => {
303 error!("Error creating signing key: {:?}", e);
304 return (
305 StatusCode::INTERNAL_SERVER_ERROR,
306 Json(json!({"error": "InternalError"})),
307 )
308 .into_response();
309 }
310 };
311 let did_type = input.did_type.as_deref().unwrap_or("plc");
312 let did = match did_type {
313 "web" => {
314 let subdomain_host = format!("{}.{}", input.handle, hostname);
315 let encoded_subdomain = subdomain_host.replace(':', "%3A");
316 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
317 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
318 self_hosted_did
319 }
320 "web-external" => {
321 let d = match &input.did {
322 Some(d) if !d.trim().is_empty() => d,
323 _ => {
324 return (
325 StatusCode::BAD_REQUEST,
326 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
327 )
328 .into_response();
329 }
330 };
331 if !d.starts_with("did:web:") {
332 return (
333 StatusCode::BAD_REQUEST,
334 Json(
335 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
336 ),
337 )
338 .into_response();
339 }
340 if let Err(e) =
341 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
342 {
343 return (
344 StatusCode::BAD_REQUEST,
345 Json(json!({"error": "InvalidDid", "message": e})),
346 )
347 .into_response();
348 }
349 info!(did = %d, "Creating external did:web account");
350 d.clone()
351 }
352 _ => {
353 if let Some(d) = &input.did {
354 if d.starts_with("did:plc:") && is_migration {
355 info!(did = %d, "Migration with existing did:plc");
356 d.clone()
357 } else if d.starts_with("did:web:") {
358 if let Err(e) =
359 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref())
360 .await
361 {
362 return (
363 StatusCode::BAD_REQUEST,
364 Json(json!({"error": "InvalidDid", "message": e})),
365 )
366 .into_response();
367 }
368 d.clone()
369 } else if !d.trim().is_empty() {
370 return (
371 StatusCode::BAD_REQUEST,
372 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
373 )
374 .into_response();
375 } else {
376 let rotation_key = std::env::var("PLC_ROTATION_KEY")
377 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
378 let genesis_result = match create_genesis_operation(
379 &signing_key,
380 &rotation_key,
381 &handle,
382 &pds_endpoint,
383 ) {
384 Ok(r) => r,
385 Err(e) => {
386 error!("Error creating PLC genesis operation: {:?}", e);
387 return (
388 StatusCode::INTERNAL_SERVER_ERROR,
389 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
390 )
391 .into_response();
392 }
393 };
394 let plc_client = PlcClient::new(None);
395 if let Err(e) = plc_client
396 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
397 .await
398 {
399 error!("Failed to submit PLC genesis operation: {:?}", e);
400 return (
401 StatusCode::BAD_GATEWAY,
402 Json(json!({
403 "error": "UpstreamError",
404 "message": format!("Failed to register DID with PLC directory: {}", e)
405 })),
406 )
407 .into_response();
408 }
409 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
410 genesis_result.did
411 }
412 } else {
413 let rotation_key = std::env::var("PLC_ROTATION_KEY")
414 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
415 let genesis_result = match create_genesis_operation(
416 &signing_key,
417 &rotation_key,
418 &handle,
419 &pds_endpoint,
420 ) {
421 Ok(r) => r,
422 Err(e) => {
423 error!("Error creating PLC genesis operation: {:?}", e);
424 return (
425 StatusCode::INTERNAL_SERVER_ERROR,
426 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
427 )
428 .into_response();
429 }
430 };
431 let plc_client = PlcClient::new(None);
432 if let Err(e) = plc_client
433 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
434 .await
435 {
436 error!("Failed to submit PLC genesis operation: {:?}", e);
437 return (
438 StatusCode::BAD_GATEWAY,
439 Json(json!({
440 "error": "UpstreamError",
441 "message": format!("Failed to register DID with PLC directory: {}", e)
442 })),
443 )
444 .into_response();
445 }
446 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
447 genesis_result.did
448 }
449 }
450 };
451 let mut tx = match state.db.begin().await {
452 Ok(tx) => tx,
453 Err(e) => {
454 error!("Error starting transaction: {:?}", e);
455 return (
456 StatusCode::INTERNAL_SERVER_ERROR,
457 Json(json!({"error": "InternalError"})),
458 )
459 .into_response();
460 }
461 };
462 if is_migration {
463 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
464 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
465 .bind(&did)
466 .fetch_optional(&mut *tx)
467 .await
468 .unwrap_or(None);
469 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
470 if deactivated_at.is_some() {
471 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
472 let update_result: Result<_, sqlx::Error> =
473 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
474 .bind(&handle)
475 .bind(account_id)
476 .execute(&mut *tx)
477 .await;
478 if let Err(e) = update_result {
479 if let Some(db_err) = e.as_database_error()
480 && db_err
481 .constraint()
482 .map(|c| c.contains("handle"))
483 .unwrap_or(false)
484 {
485 return (
486 StatusCode::BAD_REQUEST,
487 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
488 )
489 .into_response();
490 }
491 error!("Error reactivating account: {:?}", e);
492 return (
493 StatusCode::INTERNAL_SERVER_ERROR,
494 Json(json!({"error": "InternalError"})),
495 )
496 .into_response();
497 }
498 if let Err(e) = tx.commit().await {
499 error!("Error committing reactivation: {:?}", e);
500 return (
501 StatusCode::INTERNAL_SERVER_ERROR,
502 Json(json!({"error": "InternalError"})),
503 )
504 .into_response();
505 }
506 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
507 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
508 )
509 .bind(account_id)
510 .fetch_optional(&state.db)
511 .await
512 .unwrap_or(None);
513 let secret_key_bytes = match key_row {
514 Some((key_bytes, encryption_version)) => {
515 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
516 Ok(k) => k,
517 Err(e) => {
518 error!("Error decrypting key for reactivated account: {:?}", e);
519 return (
520 StatusCode::INTERNAL_SERVER_ERROR,
521 Json(json!({"error": "InternalError"})),
522 )
523 .into_response();
524 }
525 }
526 }
527 None => {
528 error!("No signing key found for reactivated account");
529 return (
530 StatusCode::INTERNAL_SERVER_ERROR,
531 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
532 )
533 .into_response();
534 }
535 };
536 let access_meta =
537 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
538 Ok(m) => m,
539 Err(e) => {
540 error!("Error creating access token: {:?}", e);
541 return (
542 StatusCode::INTERNAL_SERVER_ERROR,
543 Json(json!({"error": "InternalError"})),
544 )
545 .into_response();
546 }
547 };
548 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
549 &did,
550 &secret_key_bytes,
551 ) {
552 Ok(m) => m,
553 Err(e) => {
554 error!("Error creating refresh token: {:?}", e);
555 return (
556 StatusCode::INTERNAL_SERVER_ERROR,
557 Json(json!({"error": "InternalError"})),
558 )
559 .into_response();
560 }
561 };
562 let session_result: Result<_, sqlx::Error> = sqlx::query(
563 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
564 )
565 .bind(&did)
566 .bind(&access_meta.jti)
567 .bind(&refresh_meta.jti)
568 .bind(access_meta.expires_at)
569 .bind(refresh_meta.expires_at)
570 .execute(&state.db)
571 .await;
572 if let Err(e) = session_result {
573 error!("Error creating session: {:?}", e);
574 return (
575 StatusCode::INTERNAL_SERVER_ERROR,
576 Json(json!({"error": "InternalError"})),
577 )
578 .into_response();
579 }
580 return (
581 StatusCode::OK,
582 Json(CreateAccountOutput {
583 handle: handle.clone(),
584 did,
585 access_jwt: Some(access_meta.token),
586 refresh_jwt: Some(refresh_meta.token),
587 verification_required: false,
588 verification_channel: "email".to_string(),
589 }),
590 )
591 .into_response();
592 } else {
593 return (
594 StatusCode::BAD_REQUEST,
595 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
596 )
597 .into_response();
598 }
599 }
600 }
601 let exists_result: Option<(i32,)> =
602 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
603 .bind(&handle)
604 .fetch_optional(&mut *tx)
605 .await
606 .unwrap_or(None);
607 if exists_result.is_some() {
608 return (
609 StatusCode::BAD_REQUEST,
610 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
611 )
612 .into_response();
613 }
614 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
615 .map(|v| v == "true" || v == "1")
616 .unwrap_or(false);
617 if invite_code_required
618 && input
619 .invite_code
620 .as_ref()
621 .map(|c| c.trim().is_empty())
622 .unwrap_or(true)
623 {
624 return (
625 StatusCode::BAD_REQUEST,
626 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
627 )
628 .into_response();
629 }
630 if let Some(code) = &input.invite_code
631 && !code.trim().is_empty()
632 {
633 let invite_query = sqlx::query!(
634 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
635 code
636 )
637 .fetch_optional(&mut *tx)
638 .await;
639 match invite_query {
640 Ok(Some(row)) => {
641 if row.available_uses <= 0 {
642 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
643 }
644 let update_invite = sqlx::query!(
645 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
646 code
647 )
648 .execute(&mut *tx)
649 .await;
650 if let Err(e) = update_invite {
651 error!("Error updating invite code: {:?}", e);
652 return (
653 StatusCode::INTERNAL_SERVER_ERROR,
654 Json(json!({"error": "InternalError"})),
655 )
656 .into_response();
657 }
658 }
659 Ok(None) => {
660 return (
661 StatusCode::BAD_REQUEST,
662 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
663 )
664 .into_response();
665 }
666 Err(e) => {
667 error!("Error checking invite code: {:?}", e);
668 return (
669 StatusCode::INTERNAL_SERVER_ERROR,
670 Json(json!({"error": "InternalError"})),
671 )
672 .into_response();
673 }
674 }
675 }
676 if let Err(e) = validate_password(&input.password) {
677 return (
678 StatusCode::BAD_REQUEST,
679 Json(json!({
680 "error": "InvalidPassword",
681 "message": e.to_string()
682 })),
683 )
684 .into_response();
685 }
686
687 let password_hash = match hash(&input.password, DEFAULT_COST) {
688 Ok(h) => h,
689 Err(e) => {
690 error!("Error hashing password: {:?}", e);
691 return (
692 StatusCode::INTERNAL_SERVER_ERROR,
693 Json(json!({"error": "InternalError"})),
694 )
695 .into_response();
696 }
697 };
698 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
699 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30);
700 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
701 .fetch_one(&mut *tx)
702 .await
703 .map(|c| c.unwrap_or(0) == 0)
704 .unwrap_or(false);
705 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration {
706 Some(chrono::Utc::now())
707 } else {
708 None
709 };
710 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
711 r#"INSERT INTO users (
712 handle, email, did, password_hash,
713 preferred_comms_channel,
714 discord_id, telegram_username, signal_number,
715 is_admin, deactivated_at, email_verified
716 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
717 )
718 .bind(&handle)
719 .bind(&email)
720 .bind(&did)
721 .bind(&password_hash)
722 .bind(verification_channel)
723 .bind(
724 input
725 .discord_id
726 .as_deref()
727 .map(|s| s.trim())
728 .filter(|s| !s.is_empty()),
729 )
730 .bind(
731 input
732 .telegram_username
733 .as_deref()
734 .map(|s| s.trim())
735 .filter(|s| !s.is_empty()),
736 )
737 .bind(
738 input
739 .signal_number
740 .as_deref()
741 .map(|s| s.trim())
742 .filter(|s| !s.is_empty()),
743 )
744 .bind(is_first_user)
745 .bind(deactivated_at)
746 .bind(is_migration)
747 .fetch_one(&mut *tx)
748 .await;
749 let user_id = match user_insert {
750 Ok((id,)) => id,
751 Err(e) => {
752 if let Some(db_err) = e.as_database_error()
753 && db_err.code().as_deref() == Some("23505")
754 {
755 let constraint = db_err.constraint().unwrap_or("");
756 if constraint.contains("handle") || constraint.contains("users_handle") {
757 return (
758 StatusCode::BAD_REQUEST,
759 Json(json!({
760 "error": "HandleNotAvailable",
761 "message": "Handle already taken"
762 })),
763 )
764 .into_response();
765 } else if constraint.contains("email") || constraint.contains("users_email") {
766 return (
767 StatusCode::BAD_REQUEST,
768 Json(json!({
769 "error": "InvalidEmail",
770 "message": "Email already registered"
771 })),
772 )
773 .into_response();
774 } else if constraint.contains("did") || constraint.contains("users_did") {
775 return (
776 StatusCode::BAD_REQUEST,
777 Json(json!({
778 "error": "AccountAlreadyExists",
779 "message": "An account with this DID already exists"
780 })),
781 )
782 .into_response();
783 }
784 }
785 error!("Error inserting user: {:?}", e);
786 return (
787 StatusCode::INTERNAL_SERVER_ERROR,
788 Json(json!({"error": "InternalError"})),
789 )
790 .into_response();
791 }
792 };
793
794 if !is_migration
795 && let Some(ref recipient) = verification_recipient
796 && let Err(e) = sqlx::query!(
797 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, $2::comms_channel, $3, $4, $5)",
798 user_id,
799 verification_channel as _,
800 verification_code,
801 recipient,
802 code_expires_at
803 )
804 .execute(&mut *tx)
805 .await {
806 error!("Error inserting verification code: {:?}", e);
807 return (
808 StatusCode::INTERNAL_SERVER_ERROR,
809 Json(json!({"error": "InternalError"})),
810 )
811 .into_response();
812 }
813 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
814 Ok(enc) => enc,
815 Err(e) => {
816 error!("Error encrypting user key: {:?}", e);
817 return (
818 StatusCode::INTERNAL_SERVER_ERROR,
819 Json(json!({"error": "InternalError"})),
820 )
821 .into_response();
822 }
823 };
824 let key_insert = sqlx::query!(
825 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
826 user_id,
827 &encrypted_key_bytes[..],
828 crate::config::ENCRYPTION_VERSION
829 )
830 .execute(&mut *tx)
831 .await;
832 if let Err(e) = key_insert {
833 error!("Error inserting user key: {:?}", e);
834 return (
835 StatusCode::INTERNAL_SERVER_ERROR,
836 Json(json!({"error": "InternalError"})),
837 )
838 .into_response();
839 }
840 if let Some(key_id) = reserved_key_id {
841 let mark_used = sqlx::query!(
842 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
843 key_id
844 )
845 .execute(&mut *tx)
846 .await;
847 if let Err(e) = mark_used {
848 error!("Error marking reserved key as used: {:?}", e);
849 return (
850 StatusCode::INTERNAL_SERVER_ERROR,
851 Json(json!({"error": "InternalError"})),
852 )
853 .into_response();
854 }
855 }
856 let mst = Mst::new(Arc::new(state.block_store.clone()));
857 let mst_root = match mst.persist().await {
858 Ok(c) => c,
859 Err(e) => {
860 error!("Error persisting MST: {:?}", e);
861 return (
862 StatusCode::INTERNAL_SERVER_ERROR,
863 Json(json!({"error": "InternalError"})),
864 )
865 .into_response();
866 }
867 };
868 let rev = Tid::now(LimitedU32::MIN);
869 let (commit_bytes, _sig) = match create_signed_commit(&did, mst_root, &rev.to_string(), None, &signing_key) {
870 Ok(result) => result,
871 Err(e) => {
872 error!("Error creating genesis commit: {:?}", e);
873 return (
874 StatusCode::INTERNAL_SERVER_ERROR,
875 Json(json!({"error": "InternalError"})),
876 )
877 .into_response();
878 }
879 };
880 let commit_cid = match state.block_store.put(&commit_bytes).await {
881 Ok(c) => c,
882 Err(e) => {
883 error!("Error saving genesis commit: {:?}", e);
884 return (
885 StatusCode::INTERNAL_SERVER_ERROR,
886 Json(json!({"error": "InternalError"})),
887 )
888 .into_response();
889 }
890 };
891 let commit_cid_str = commit_cid.to_string();
892 let repo_insert = sqlx::query!(
893 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
894 user_id,
895 commit_cid_str
896 )
897 .execute(&mut *tx)
898 .await;
899 if let Err(e) = repo_insert {
900 error!("Error initializing repo: {:?}", e);
901 return (
902 StatusCode::INTERNAL_SERVER_ERROR,
903 Json(json!({"error": "InternalError"})),
904 )
905 .into_response();
906 }
907 if let Some(code) = &input.invite_code
908 && !code.trim().is_empty()
909 {
910 let use_insert = sqlx::query!(
911 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
912 code,
913 user_id
914 )
915 .execute(&mut *tx)
916 .await;
917 if let Err(e) = use_insert {
918 error!("Error recording invite usage: {:?}", e);
919 return (
920 StatusCode::INTERNAL_SERVER_ERROR,
921 Json(json!({"error": "InternalError"})),
922 )
923 .into_response();
924 }
925 }
926 if let Err(e) = tx.commit().await {
927 error!("Error committing transaction: {:?}", e);
928 return (
929 StatusCode::INTERNAL_SERVER_ERROR,
930 Json(json!({"error": "InternalError"})),
931 )
932 .into_response();
933 }
934 if !is_migration {
935 if let Err(e) =
936 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
937 {
938 warn!("Failed to sequence identity event for {}: {}", did, e);
939 }
940 if let Err(e) =
941 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
942 {
943 warn!("Failed to sequence account event for {}: {}", did, e);
944 }
945 let profile_record = json!({
946 "$type": "app.bsky.actor.profile",
947 "displayName": input.handle
948 });
949 if let Err(e) = crate::api::repo::record::create_record_internal(
950 &state,
951 &did,
952 "app.bsky.actor.profile",
953 "self",
954 &profile_record,
955 )
956 .await
957 {
958 warn!("Failed to create default profile for {}: {}", did, e);
959 }
960 if let Some(ref recipient) = verification_recipient
961 && let Err(e) = crate::comms::enqueue_signup_verification(
962 &state.db,
963 user_id,
964 verification_channel,
965 recipient,
966 &verification_code,
967 None,
968 )
969 .await
970 {
971 warn!(
972 "Failed to enqueue signup verification notification: {:?}",
973 e
974 );
975 }
976 }
977
978 let (access_jwt, refresh_jwt) = if is_migration {
979 let access_meta =
980 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
981 Ok(m) => m,
982 Err(e) => {
983 error!("Error creating access token for migration: {:?}", e);
984 return (
985 StatusCode::INTERNAL_SERVER_ERROR,
986 Json(json!({"error": "InternalError"})),
987 )
988 .into_response();
989 }
990 };
991 let refresh_meta =
992 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
993 Ok(m) => m,
994 Err(e) => {
995 error!("Error creating refresh token for migration: {:?}", e);
996 return (
997 StatusCode::INTERNAL_SERVER_ERROR,
998 Json(json!({"error": "InternalError"})),
999 )
1000 .into_response();
1001 }
1002 };
1003 if let Err(e) = sqlx::query!(
1004 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1005 did,
1006 access_meta.jti,
1007 refresh_meta.jti,
1008 access_meta.expires_at,
1009 refresh_meta.expires_at
1010 )
1011 .execute(&state.db)
1012 .await
1013 {
1014 error!("Error creating session for migration: {:?}", e);
1015 return (
1016 StatusCode::INTERNAL_SERVER_ERROR,
1017 Json(json!({"error": "InternalError"})),
1018 )
1019 .into_response();
1020 }
1021 (Some(access_meta.token), Some(refresh_meta.token))
1022 } else {
1023 (None, None)
1024 };
1025
1026 (
1027 StatusCode::OK,
1028 Json(CreateAccountOutput {
1029 handle: handle.clone(),
1030 did,
1031 access_jwt,
1032 refresh_jwt,
1033 verification_required: !is_migration,
1034 verification_channel: verification_channel.to_string(),
1035 }),
1036 )
1037 .into_response()
1038}