this repo has no description
1use super::did::verify_did_web;
2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
4use crate::state::{AppState, RateLimitKind};
5use axum::{
6 Json,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use bcrypt::{DEFAULT_COST, hash};
12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use k256::{SecretKey, ecdsa::SigningKey};
15use rand::rngs::OsRng;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::sync::Arc;
19use tracing::{debug, error, info, warn};
20
21fn extract_client_ip(headers: &HeaderMap) -> String {
22 if let Some(forwarded) = headers.get("x-forwarded-for")
23 && let Ok(value) = forwarded.to_str()
24 && let Some(first_ip) = value.split(',').next()
25 {
26 return first_ip.trim().to_string();
27 }
28 if let Some(real_ip) = headers.get("x-real-ip")
29 && let Ok(value) = real_ip.to_str()
30 {
31 return value.trim().to_string();
32 }
33 "unknown".to_string()
34}
35
36#[derive(Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct CreateAccountInput {
39 pub handle: String,
40 pub email: Option<String>,
41 pub password: String,
42 pub invite_code: Option<String>,
43 pub did: Option<String>,
44 pub did_type: Option<String>,
45 pub signing_key: Option<String>,
46 pub verification_channel: Option<String>,
47 pub discord_id: Option<String>,
48 pub telegram_username: Option<String>,
49 pub signal_number: Option<String>,
50}
51
52#[derive(Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct CreateAccountOutput {
55 pub handle: String,
56 pub did: String,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub access_jwt: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub refresh_jwt: Option<String>,
61 pub verification_required: bool,
62 pub verification_channel: String,
63}
64
65pub async fn create_account(
66 State(state): State<AppState>,
67 headers: HeaderMap,
68 Json(input): Json<CreateAccountInput>,
69) -> Response {
70 info!("create_account called");
71 let client_ip = extract_client_ip(&headers);
72 if !state
73 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
74 .await
75 {
76 warn!(ip = %client_ip, "Account creation rate limit exceeded");
77 return (
78 StatusCode::TOO_MANY_REQUESTS,
79 Json(json!({
80 "error": "RateLimitExceeded",
81 "message": "Too many account creation attempts. Please try again later."
82 })),
83 )
84 .into_response();
85 }
86
87 let migration_auth = if let Some(token) =
88 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
89 {
90 if is_service_token(&token) {
91 let verifier = ServiceTokenVerifier::new();
92 match verifier
93 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
94 .await
95 {
96 Ok(claims) => {
97 debug!("Service token verified for migration: iss={}", claims.iss);
98 Some(claims.iss)
99 }
100 Err(e) => {
101 error!("Service token verification failed: {:?}", e);
102 return (
103 StatusCode::UNAUTHORIZED,
104 Json(json!({
105 "error": "AuthenticationFailed",
106 "message": format!("Service token verification failed: {}", e)
107 })),
108 )
109 .into_response();
110 }
111 }
112 } else {
113 None
114 }
115 } else {
116 None
117 };
118
119 let is_migration = migration_auth.is_some()
120 && input
121 .did
122 .as_ref()
123 .map(|d| d.starts_with("did:plc:"))
124 .unwrap_or(false);
125
126 if is_migration {
127 let migration_did = input.did.as_ref().unwrap();
128 let auth_did = migration_auth.as_ref().unwrap();
129 if migration_did != auth_did {
130 return (
131 StatusCode::FORBIDDEN,
132 Json(json!({
133 "error": "AuthorizationError",
134 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did)
135 })),
136 )
137 .into_response();
138 }
139 info!(did = %migration_did, "Processing account migration");
140 }
141
142 let hostname_for_validation =
143 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
144 let pds_suffix = format!(".{}", hostname_for_validation);
145
146 let validated_short_handle = if !input.handle.contains('.')
147 || input.handle.ends_with(&pds_suffix)
148 {
149 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
150 input
151 .handle
152 .strip_suffix(&pds_suffix)
153 .unwrap_or(&input.handle)
154 } else {
155 &input.handle
156 };
157 match crate::api::validation::validate_short_handle(handle_to_validate) {
158 Ok(h) => h,
159 Err(e) => {
160 return (
161 StatusCode::BAD_REQUEST,
162 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
163 )
164 .into_response();
165 }
166 }
167 } else {
168 if input.handle.contains(' ') || input.handle.contains('\t') {
169 return (
170 StatusCode::BAD_REQUEST,
171 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
172 )
173 .into_response();
174 }
175 for c in input.handle.chars() {
176 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
177 return (
178 StatusCode::BAD_REQUEST,
179 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
180 )
181 .into_response();
182 }
183 }
184 input.handle.to_lowercase()
185 };
186 let email: Option<String> = input
187 .email
188 .as_ref()
189 .map(|e| e.trim().to_string())
190 .filter(|e| !e.is_empty());
191 if let Some(ref email) = email
192 && !crate::api::validation::is_valid_email(email)
193 {
194 return (
195 StatusCode::BAD_REQUEST,
196 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
197 )
198 .into_response();
199 }
200 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
201 let valid_channels = ["email", "discord", "telegram", "signal"];
202 if !valid_channels.contains(&verification_channel) && !is_migration {
203 return (
204 StatusCode::BAD_REQUEST,
205 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
206 )
207 .into_response();
208 }
209 let verification_recipient = if is_migration {
210 None
211 } else {
212 Some(match verification_channel {
213 "email" => match &input.email {
214 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
215 _ => return (
216 StatusCode::BAD_REQUEST,
217 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
218 ).into_response(),
219 },
220 "discord" => match &input.discord_id {
221 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
222 _ => return (
223 StatusCode::BAD_REQUEST,
224 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
225 ).into_response(),
226 },
227 "telegram" => match &input.telegram_username {
228 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
229 _ => return (
230 StatusCode::BAD_REQUEST,
231 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
232 ).into_response(),
233 },
234 "signal" => match &input.signal_number {
235 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
236 _ => return (
237 StatusCode::BAD_REQUEST,
238 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
239 ).into_response(),
240 },
241 _ => return (
242 StatusCode::BAD_REQUEST,
243 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
244 ).into_response(),
245 })
246 };
247 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
248 let pds_endpoint = format!("https://{}", hostname);
249 let suffix = format!(".{}", hostname);
250 let handle = if input.handle.ends_with(&suffix) {
251 format!("{}.{}", validated_short_handle, hostname)
252 } else if input.handle.contains('.') {
253 validated_short_handle.clone()
254 } else {
255 format!("{}.{}", validated_short_handle, hostname)
256 };
257 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
258 if let Some(signing_key_did) = &input.signing_key {
259 let reserved = sqlx::query!(
260 r#"
261 SELECT id, private_key_bytes
262 FROM reserved_signing_keys
263 WHERE public_key_did_key = $1
264 AND used_at IS NULL
265 AND expires_at > NOW()
266 FOR UPDATE
267 "#,
268 signing_key_did
269 )
270 .fetch_optional(&state.db)
271 .await;
272 match reserved {
273 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
274 Ok(None) => {
275 return (
276 StatusCode::BAD_REQUEST,
277 Json(json!({
278 "error": "InvalidSigningKey",
279 "message": "Signing key not found, already used, or expired"
280 })),
281 )
282 .into_response();
283 }
284 Err(e) => {
285 error!("Error looking up reserved signing key: {:?}", e);
286 return (
287 StatusCode::INTERNAL_SERVER_ERROR,
288 Json(json!({"error": "InternalError"})),
289 )
290 .into_response();
291 }
292 }
293 } else {
294 let secret_key = SecretKey::random(&mut OsRng);
295 (secret_key.to_bytes().to_vec(), None)
296 };
297 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
298 Ok(k) => k,
299 Err(e) => {
300 error!("Error creating signing key: {:?}", e);
301 return (
302 StatusCode::INTERNAL_SERVER_ERROR,
303 Json(json!({"error": "InternalError"})),
304 )
305 .into_response();
306 }
307 };
308 let did_type = input.did_type.as_deref().unwrap_or("plc");
309 let did = match did_type {
310 "web" => {
311 let subdomain_host = format!("{}.{}", input.handle, hostname);
312 let encoded_subdomain = subdomain_host.replace(':', "%3A");
313 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
314 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
315 self_hosted_did
316 }
317 "web-external" => {
318 let d = match &input.did {
319 Some(d) if !d.trim().is_empty() => d,
320 _ => {
321 return (
322 StatusCode::BAD_REQUEST,
323 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
324 )
325 .into_response();
326 }
327 };
328 if !d.starts_with("did:web:") {
329 return (
330 StatusCode::BAD_REQUEST,
331 Json(
332 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
333 ),
334 )
335 .into_response();
336 }
337 if let Err(e) =
338 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
339 {
340 return (
341 StatusCode::BAD_REQUEST,
342 Json(json!({"error": "InvalidDid", "message": e})),
343 )
344 .into_response();
345 }
346 info!(did = %d, "Creating external did:web account");
347 d.clone()
348 }
349 _ => {
350 if let Some(d) = &input.did {
351 if d.starts_with("did:plc:") && is_migration {
352 info!(did = %d, "Migration with existing did:plc");
353 d.clone()
354 } else if d.starts_with("did:web:") {
355 if let Err(e) =
356 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref())
357 .await
358 {
359 return (
360 StatusCode::BAD_REQUEST,
361 Json(json!({"error": "InvalidDid", "message": e})),
362 )
363 .into_response();
364 }
365 d.clone()
366 } else if !d.trim().is_empty() {
367 return (
368 StatusCode::BAD_REQUEST,
369 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
370 )
371 .into_response();
372 } else {
373 let rotation_key = std::env::var("PLC_ROTATION_KEY")
374 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
375 let genesis_result = match create_genesis_operation(
376 &signing_key,
377 &rotation_key,
378 &handle,
379 &pds_endpoint,
380 ) {
381 Ok(r) => r,
382 Err(e) => {
383 error!("Error creating PLC genesis operation: {:?}", e);
384 return (
385 StatusCode::INTERNAL_SERVER_ERROR,
386 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
387 )
388 .into_response();
389 }
390 };
391 let plc_client = PlcClient::new(None);
392 if let Err(e) = plc_client
393 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
394 .await
395 {
396 error!("Failed to submit PLC genesis operation: {:?}", e);
397 return (
398 StatusCode::BAD_GATEWAY,
399 Json(json!({
400 "error": "UpstreamError",
401 "message": format!("Failed to register DID with PLC directory: {}", e)
402 })),
403 )
404 .into_response();
405 }
406 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
407 genesis_result.did
408 }
409 } else {
410 let rotation_key = std::env::var("PLC_ROTATION_KEY")
411 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
412 let genesis_result = match create_genesis_operation(
413 &signing_key,
414 &rotation_key,
415 &handle,
416 &pds_endpoint,
417 ) {
418 Ok(r) => r,
419 Err(e) => {
420 error!("Error creating PLC genesis operation: {:?}", e);
421 return (
422 StatusCode::INTERNAL_SERVER_ERROR,
423 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
424 )
425 .into_response();
426 }
427 };
428 let plc_client = PlcClient::new(None);
429 if let Err(e) = plc_client
430 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
431 .await
432 {
433 error!("Failed to submit PLC genesis operation: {:?}", e);
434 return (
435 StatusCode::BAD_GATEWAY,
436 Json(json!({
437 "error": "UpstreamError",
438 "message": format!("Failed to register DID with PLC directory: {}", e)
439 })),
440 )
441 .into_response();
442 }
443 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
444 genesis_result.did
445 }
446 }
447 };
448 let mut tx = match state.db.begin().await {
449 Ok(tx) => tx,
450 Err(e) => {
451 error!("Error starting transaction: {:?}", e);
452 return (
453 StatusCode::INTERNAL_SERVER_ERROR,
454 Json(json!({"error": "InternalError"})),
455 )
456 .into_response();
457 }
458 };
459 if is_migration {
460 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
461 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
462 .bind(&did)
463 .fetch_optional(&mut *tx)
464 .await
465 .unwrap_or(None);
466 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
467 if deactivated_at.is_some() {
468 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
469 let update_result: Result<_, sqlx::Error> =
470 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
471 .bind(&handle)
472 .bind(account_id)
473 .execute(&mut *tx)
474 .await;
475 if let Err(e) = update_result {
476 if let Some(db_err) = e.as_database_error()
477 && db_err
478 .constraint()
479 .map(|c| c.contains("handle"))
480 .unwrap_or(false)
481 {
482 return (
483 StatusCode::BAD_REQUEST,
484 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
485 )
486 .into_response();
487 }
488 error!("Error reactivating account: {:?}", e);
489 return (
490 StatusCode::INTERNAL_SERVER_ERROR,
491 Json(json!({"error": "InternalError"})),
492 )
493 .into_response();
494 }
495 if let Err(e) = tx.commit().await {
496 error!("Error committing reactivation: {:?}", e);
497 return (
498 StatusCode::INTERNAL_SERVER_ERROR,
499 Json(json!({"error": "InternalError"})),
500 )
501 .into_response();
502 }
503 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
504 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
505 )
506 .bind(account_id)
507 .fetch_optional(&state.db)
508 .await
509 .unwrap_or(None);
510 let secret_key_bytes = match key_row {
511 Some((key_bytes, encryption_version)) => {
512 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
513 Ok(k) => k,
514 Err(e) => {
515 error!("Error decrypting key for reactivated account: {:?}", e);
516 return (
517 StatusCode::INTERNAL_SERVER_ERROR,
518 Json(json!({"error": "InternalError"})),
519 )
520 .into_response();
521 }
522 }
523 }
524 None => {
525 error!("No signing key found for reactivated account");
526 return (
527 StatusCode::INTERNAL_SERVER_ERROR,
528 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
529 )
530 .into_response();
531 }
532 };
533 let access_meta =
534 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
535 Ok(m) => m,
536 Err(e) => {
537 error!("Error creating access token: {:?}", e);
538 return (
539 StatusCode::INTERNAL_SERVER_ERROR,
540 Json(json!({"error": "InternalError"})),
541 )
542 .into_response();
543 }
544 };
545 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
546 &did,
547 &secret_key_bytes,
548 ) {
549 Ok(m) => m,
550 Err(e) => {
551 error!("Error creating refresh token: {:?}", e);
552 return (
553 StatusCode::INTERNAL_SERVER_ERROR,
554 Json(json!({"error": "InternalError"})),
555 )
556 .into_response();
557 }
558 };
559 let session_result: Result<_, sqlx::Error> = sqlx::query(
560 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
561 )
562 .bind(&did)
563 .bind(&access_meta.jti)
564 .bind(&refresh_meta.jti)
565 .bind(access_meta.expires_at)
566 .bind(refresh_meta.expires_at)
567 .execute(&state.db)
568 .await;
569 if let Err(e) = session_result {
570 error!("Error creating session: {:?}", e);
571 return (
572 StatusCode::INTERNAL_SERVER_ERROR,
573 Json(json!({"error": "InternalError"})),
574 )
575 .into_response();
576 }
577 return (
578 StatusCode::OK,
579 Json(CreateAccountOutput {
580 handle: handle.clone(),
581 did,
582 access_jwt: Some(access_meta.token),
583 refresh_jwt: Some(refresh_meta.token),
584 verification_required: false,
585 verification_channel: "email".to_string(),
586 }),
587 )
588 .into_response();
589 } else {
590 return (
591 StatusCode::BAD_REQUEST,
592 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
593 )
594 .into_response();
595 }
596 }
597 }
598 let exists_result: Option<(i32,)> =
599 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
600 .bind(&handle)
601 .fetch_optional(&mut *tx)
602 .await
603 .unwrap_or(None);
604 if exists_result.is_some() {
605 return (
606 StatusCode::BAD_REQUEST,
607 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
608 )
609 .into_response();
610 }
611 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
612 .map(|v| v == "true" || v == "1")
613 .unwrap_or(false);
614 if invite_code_required
615 && input
616 .invite_code
617 .as_ref()
618 .map(|c| c.trim().is_empty())
619 .unwrap_or(true)
620 {
621 return (
622 StatusCode::BAD_REQUEST,
623 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
624 )
625 .into_response();
626 }
627 if let Some(code) = &input.invite_code
628 && !code.trim().is_empty()
629 {
630 let invite_query = sqlx::query!(
631 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
632 code
633 )
634 .fetch_optional(&mut *tx)
635 .await;
636 match invite_query {
637 Ok(Some(row)) => {
638 if row.available_uses <= 0 {
639 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
640 }
641 let update_invite = sqlx::query!(
642 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
643 code
644 )
645 .execute(&mut *tx)
646 .await;
647 if let Err(e) = update_invite {
648 error!("Error updating invite code: {:?}", e);
649 return (
650 StatusCode::INTERNAL_SERVER_ERROR,
651 Json(json!({"error": "InternalError"})),
652 )
653 .into_response();
654 }
655 }
656 Ok(None) => {
657 return (
658 StatusCode::BAD_REQUEST,
659 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
660 )
661 .into_response();
662 }
663 Err(e) => {
664 error!("Error checking invite code: {:?}", e);
665 return (
666 StatusCode::INTERNAL_SERVER_ERROR,
667 Json(json!({"error": "InternalError"})),
668 )
669 .into_response();
670 }
671 }
672 }
673 let password_hash = match hash(&input.password, DEFAULT_COST) {
674 Ok(h) => h,
675 Err(e) => {
676 error!("Error hashing password: {:?}", e);
677 return (
678 StatusCode::INTERNAL_SERVER_ERROR,
679 Json(json!({"error": "InternalError"})),
680 )
681 .into_response();
682 }
683 };
684 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
685 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30);
686 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
687 .fetch_one(&mut *tx)
688 .await
689 .map(|c| c.unwrap_or(0) == 0)
690 .unwrap_or(false);
691 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration {
692 Some(chrono::Utc::now())
693 } else {
694 None
695 };
696 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
697 r#"INSERT INTO users (
698 handle, email, did, password_hash,
699 preferred_comms_channel,
700 discord_id, telegram_username, signal_number,
701 is_admin, deactivated_at, email_verified
702 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
703 )
704 .bind(&handle)
705 .bind(&email)
706 .bind(&did)
707 .bind(&password_hash)
708 .bind(verification_channel)
709 .bind(
710 input
711 .discord_id
712 .as_deref()
713 .map(|s| s.trim())
714 .filter(|s| !s.is_empty()),
715 )
716 .bind(
717 input
718 .telegram_username
719 .as_deref()
720 .map(|s| s.trim())
721 .filter(|s| !s.is_empty()),
722 )
723 .bind(
724 input
725 .signal_number
726 .as_deref()
727 .map(|s| s.trim())
728 .filter(|s| !s.is_empty()),
729 )
730 .bind(is_first_user)
731 .bind(deactivated_at)
732 .bind(is_migration)
733 .fetch_one(&mut *tx)
734 .await;
735 let user_id = match user_insert {
736 Ok((id,)) => id,
737 Err(e) => {
738 if let Some(db_err) = e.as_database_error()
739 && db_err.code().as_deref() == Some("23505")
740 {
741 let constraint = db_err.constraint().unwrap_or("");
742 if constraint.contains("handle") || constraint.contains("users_handle") {
743 return (
744 StatusCode::BAD_REQUEST,
745 Json(json!({
746 "error": "HandleNotAvailable",
747 "message": "Handle already taken"
748 })),
749 )
750 .into_response();
751 } else if constraint.contains("email") || constraint.contains("users_email") {
752 return (
753 StatusCode::BAD_REQUEST,
754 Json(json!({
755 "error": "InvalidEmail",
756 "message": "Email already registered"
757 })),
758 )
759 .into_response();
760 } else if constraint.contains("did") || constraint.contains("users_did") {
761 return (
762 StatusCode::BAD_REQUEST,
763 Json(json!({
764 "error": "AccountAlreadyExists",
765 "message": "An account with this DID already exists"
766 })),
767 )
768 .into_response();
769 }
770 }
771 error!("Error inserting user: {:?}", e);
772 return (
773 StatusCode::INTERNAL_SERVER_ERROR,
774 Json(json!({"error": "InternalError"})),
775 )
776 .into_response();
777 }
778 };
779
780 if !is_migration
781 && let Some(ref recipient) = verification_recipient
782 && let Err(e) = sqlx::query!(
783 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, $2::comms_channel, $3, $4, $5)",
784 user_id,
785 verification_channel as _,
786 verification_code,
787 recipient,
788 code_expires_at
789 )
790 .execute(&mut *tx)
791 .await {
792 error!("Error inserting verification code: {:?}", e);
793 return (
794 StatusCode::INTERNAL_SERVER_ERROR,
795 Json(json!({"error": "InternalError"})),
796 )
797 .into_response();
798 }
799 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
800 Ok(enc) => enc,
801 Err(e) => {
802 error!("Error encrypting user key: {:?}", e);
803 return (
804 StatusCode::INTERNAL_SERVER_ERROR,
805 Json(json!({"error": "InternalError"})),
806 )
807 .into_response();
808 }
809 };
810 let key_insert = sqlx::query!(
811 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
812 user_id,
813 &encrypted_key_bytes[..],
814 crate::config::ENCRYPTION_VERSION
815 )
816 .execute(&mut *tx)
817 .await;
818 if let Err(e) = key_insert {
819 error!("Error inserting user key: {:?}", e);
820 return (
821 StatusCode::INTERNAL_SERVER_ERROR,
822 Json(json!({"error": "InternalError"})),
823 )
824 .into_response();
825 }
826 if let Some(key_id) = reserved_key_id {
827 let mark_used = sqlx::query!(
828 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
829 key_id
830 )
831 .execute(&mut *tx)
832 .await;
833 if let Err(e) = mark_used {
834 error!("Error marking reserved key as used: {:?}", e);
835 return (
836 StatusCode::INTERNAL_SERVER_ERROR,
837 Json(json!({"error": "InternalError"})),
838 )
839 .into_response();
840 }
841 }
842 let mst = Mst::new(Arc::new(state.block_store.clone()));
843 let mst_root = match mst.persist().await {
844 Ok(c) => c,
845 Err(e) => {
846 error!("Error persisting MST: {:?}", e);
847 return (
848 StatusCode::INTERNAL_SERVER_ERROR,
849 Json(json!({"error": "InternalError"})),
850 )
851 .into_response();
852 }
853 };
854 let did_obj = match Did::new(&did) {
855 Ok(d) => d,
856 Err(_) => {
857 return (
858 StatusCode::INTERNAL_SERVER_ERROR,
859 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
860 )
861 .into_response();
862 }
863 };
864 let rev = Tid::now(LimitedU32::MIN);
865 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None);
866 let signed_commit = match unsigned_commit.sign(&signing_key) {
867 Ok(c) => c,
868 Err(e) => {
869 error!("Error signing genesis commit: {:?}", e);
870 return (
871 StatusCode::INTERNAL_SERVER_ERROR,
872 Json(json!({"error": "InternalError"})),
873 )
874 .into_response();
875 }
876 };
877 let commit_bytes = match signed_commit.to_cbor() {
878 Ok(b) => b,
879 Err(e) => {
880 error!("Error serializing genesis commit: {:?}", e);
881 return (
882 StatusCode::INTERNAL_SERVER_ERROR,
883 Json(json!({"error": "InternalError"})),
884 )
885 .into_response();
886 }
887 };
888 let commit_cid = match state.block_store.put(&commit_bytes).await {
889 Ok(c) => c,
890 Err(e) => {
891 error!("Error saving genesis commit: {:?}", e);
892 return (
893 StatusCode::INTERNAL_SERVER_ERROR,
894 Json(json!({"error": "InternalError"})),
895 )
896 .into_response();
897 }
898 };
899 let commit_cid_str = commit_cid.to_string();
900 let repo_insert = sqlx::query!(
901 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
902 user_id,
903 commit_cid_str
904 )
905 .execute(&mut *tx)
906 .await;
907 if let Err(e) = repo_insert {
908 error!("Error initializing repo: {:?}", e);
909 return (
910 StatusCode::INTERNAL_SERVER_ERROR,
911 Json(json!({"error": "InternalError"})),
912 )
913 .into_response();
914 }
915 if let Some(code) = &input.invite_code
916 && !code.trim().is_empty()
917 {
918 let use_insert = sqlx::query!(
919 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
920 code,
921 user_id
922 )
923 .execute(&mut *tx)
924 .await;
925 if let Err(e) = use_insert {
926 error!("Error recording invite usage: {:?}", e);
927 return (
928 StatusCode::INTERNAL_SERVER_ERROR,
929 Json(json!({"error": "InternalError"})),
930 )
931 .into_response();
932 }
933 }
934 if let Err(e) = tx.commit().await {
935 error!("Error committing transaction: {:?}", e);
936 return (
937 StatusCode::INTERNAL_SERVER_ERROR,
938 Json(json!({"error": "InternalError"})),
939 )
940 .into_response();
941 }
942 if !is_migration {
943 if let Err(e) =
944 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
945 {
946 warn!("Failed to sequence identity event for {}: {}", did, e);
947 }
948 if let Err(e) =
949 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
950 {
951 warn!("Failed to sequence account event for {}: {}", did, e);
952 }
953 let profile_record = json!({
954 "$type": "app.bsky.actor.profile",
955 "displayName": input.handle
956 });
957 if let Err(e) = crate::api::repo::record::create_record_internal(
958 &state,
959 &did,
960 "app.bsky.actor.profile",
961 "self",
962 &profile_record,
963 )
964 .await
965 {
966 warn!("Failed to create default profile for {}: {}", did, e);
967 }
968 if let Some(ref recipient) = verification_recipient
969 && let Err(e) = crate::comms::enqueue_signup_verification(
970 &state.db,
971 user_id,
972 verification_channel,
973 recipient,
974 &verification_code,
975 )
976 .await
977 {
978 warn!(
979 "Failed to enqueue signup verification notification: {:?}",
980 e
981 );
982 }
983 }
984
985 let (access_jwt, refresh_jwt) = if is_migration {
986 let access_meta =
987 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
988 Ok(m) => m,
989 Err(e) => {
990 error!("Error creating access token for migration: {:?}", e);
991 return (
992 StatusCode::INTERNAL_SERVER_ERROR,
993 Json(json!({"error": "InternalError"})),
994 )
995 .into_response();
996 }
997 };
998 let refresh_meta =
999 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1000 Ok(m) => m,
1001 Err(e) => {
1002 error!("Error creating refresh token for migration: {:?}", e);
1003 return (
1004 StatusCode::INTERNAL_SERVER_ERROR,
1005 Json(json!({"error": "InternalError"})),
1006 )
1007 .into_response();
1008 }
1009 };
1010 if let Err(e) = sqlx::query!(
1011 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1012 did,
1013 access_meta.jti,
1014 refresh_meta.jti,
1015 access_meta.expires_at,
1016 refresh_meta.expires_at
1017 )
1018 .execute(&state.db)
1019 .await
1020 {
1021 error!("Error creating session for migration: {:?}", e);
1022 return (
1023 StatusCode::INTERNAL_SERVER_ERROR,
1024 Json(json!({"error": "InternalError"})),
1025 )
1026 .into_response();
1027 }
1028 (Some(access_meta.token), Some(refresh_meta.token))
1029 } else {
1030 (None, None)
1031 };
1032
1033 (
1034 StatusCode::OK,
1035 Json(CreateAccountOutput {
1036 handle: handle.clone(),
1037 did,
1038 access_jwt,
1039 refresh_jwt,
1040 verification_required: !is_migration,
1041 verification_channel: verification_channel.to_string(),
1042 }),
1043 )
1044 .into_response()
1045}