this repo has no description
1use super::did::verify_did_web;
2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
4use crate::state::{AppState, RateLimitKind};
5use crate::validation::validate_password;
6use axum::{
7 Json,
8 extract::State,
9 http::{HeaderMap, StatusCode},
10 response::{IntoResponse, Response},
11};
12use bcrypt::{DEFAULT_COST, hash};
13use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
14use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
15use k256::{SecretKey, ecdsa::SigningKey};
16use rand::rngs::OsRng;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::sync::Arc;
20use tracing::{debug, error, info, warn};
21
22fn extract_client_ip(headers: &HeaderMap) -> String {
23 if let Some(forwarded) = headers.get("x-forwarded-for")
24 && let Ok(value) = forwarded.to_str()
25 && let Some(first_ip) = value.split(',').next()
26 {
27 return first_ip.trim().to_string();
28 }
29 if let Some(real_ip) = headers.get("x-real-ip")
30 && let Ok(value) = real_ip.to_str()
31 {
32 return value.trim().to_string();
33 }
34 "unknown".to_string()
35}
36
37#[derive(Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct CreateAccountInput {
40 pub handle: String,
41 pub email: Option<String>,
42 pub password: String,
43 pub invite_code: Option<String>,
44 pub did: Option<String>,
45 pub did_type: Option<String>,
46 pub signing_key: Option<String>,
47 pub verification_channel: Option<String>,
48 pub discord_id: Option<String>,
49 pub telegram_username: Option<String>,
50 pub signal_number: Option<String>,
51}
52
53#[derive(Serialize)]
54#[serde(rename_all = "camelCase")]
55pub struct CreateAccountOutput {
56 pub handle: String,
57 pub did: String,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub access_jwt: Option<String>,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub refresh_jwt: Option<String>,
62 pub verification_required: bool,
63 pub verification_channel: String,
64}
65
66pub async fn create_account(
67 State(state): State<AppState>,
68 headers: HeaderMap,
69 Json(input): Json<CreateAccountInput>,
70) -> Response {
71 info!("create_account called");
72 let client_ip = extract_client_ip(&headers);
73 if !state
74 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
75 .await
76 {
77 warn!(ip = %client_ip, "Account creation rate limit exceeded");
78 return (
79 StatusCode::TOO_MANY_REQUESTS,
80 Json(json!({
81 "error": "RateLimitExceeded",
82 "message": "Too many account creation attempts. Please try again later."
83 })),
84 )
85 .into_response();
86 }
87
88 let migration_auth = if let Some(token) =
89 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
90 {
91 if is_service_token(&token) {
92 let verifier = ServiceTokenVerifier::new();
93 match verifier
94 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
95 .await
96 {
97 Ok(claims) => {
98 debug!("Service token verified for migration: iss={}", claims.iss);
99 Some(claims.iss)
100 }
101 Err(e) => {
102 error!("Service token verification failed: {:?}", e);
103 return (
104 StatusCode::UNAUTHORIZED,
105 Json(json!({
106 "error": "AuthenticationFailed",
107 "message": format!("Service token verification failed: {}", e)
108 })),
109 )
110 .into_response();
111 }
112 }
113 } else {
114 None
115 }
116 } else {
117 None
118 };
119
120 let is_migration = migration_auth.is_some()
121 && input
122 .did
123 .as_ref()
124 .map(|d| d.starts_with("did:plc:"))
125 .unwrap_or(false);
126
127 if is_migration {
128 if let (Some(migration_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
129 {
130 if migration_did != auth_did {
131 return (
132 StatusCode::FORBIDDEN,
133 Json(json!({
134 "error": "AuthorizationError",
135 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did)
136 })),
137 )
138 .into_response();
139 }
140 info!(did = %migration_did, "Processing account migration");
141 }
142 }
143
144 let hostname_for_validation =
145 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
146 let pds_suffix = format!(".{}", hostname_for_validation);
147
148 let validated_short_handle = if !input.handle.contains('.')
149 || input.handle.ends_with(&pds_suffix)
150 {
151 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
152 input
153 .handle
154 .strip_suffix(&pds_suffix)
155 .unwrap_or(&input.handle)
156 } else {
157 &input.handle
158 };
159 match crate::api::validation::validate_short_handle(handle_to_validate) {
160 Ok(h) => h,
161 Err(e) => {
162 return (
163 StatusCode::BAD_REQUEST,
164 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
165 )
166 .into_response();
167 }
168 }
169 } else {
170 if input.handle.contains(' ') || input.handle.contains('\t') {
171 return (
172 StatusCode::BAD_REQUEST,
173 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
174 )
175 .into_response();
176 }
177 for c in input.handle.chars() {
178 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
179 return (
180 StatusCode::BAD_REQUEST,
181 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
182 )
183 .into_response();
184 }
185 }
186 input.handle.to_lowercase()
187 };
188 let email: Option<String> = input
189 .email
190 .as_ref()
191 .map(|e| e.trim().to_string())
192 .filter(|e| !e.is_empty());
193 if let Some(ref email) = email
194 && !crate::api::validation::is_valid_email(email)
195 {
196 return (
197 StatusCode::BAD_REQUEST,
198 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
199 )
200 .into_response();
201 }
202 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
203 let valid_channels = ["email", "discord", "telegram", "signal"];
204 if !valid_channels.contains(&verification_channel) && !is_migration {
205 return (
206 StatusCode::BAD_REQUEST,
207 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
208 )
209 .into_response();
210 }
211 let verification_recipient = if is_migration {
212 None
213 } else {
214 Some(match verification_channel {
215 "email" => match &input.email {
216 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
217 _ => return (
218 StatusCode::BAD_REQUEST,
219 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
220 ).into_response(),
221 },
222 "discord" => match &input.discord_id {
223 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
224 _ => return (
225 StatusCode::BAD_REQUEST,
226 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
227 ).into_response(),
228 },
229 "telegram" => match &input.telegram_username {
230 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
231 _ => return (
232 StatusCode::BAD_REQUEST,
233 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
234 ).into_response(),
235 },
236 "signal" => match &input.signal_number {
237 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
238 _ => return (
239 StatusCode::BAD_REQUEST,
240 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
241 ).into_response(),
242 },
243 _ => return (
244 StatusCode::BAD_REQUEST,
245 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
246 ).into_response(),
247 })
248 };
249 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
250 let pds_endpoint = format!("https://{}", hostname);
251 let suffix = format!(".{}", hostname);
252 let handle = if input.handle.ends_with(&suffix) {
253 format!("{}.{}", validated_short_handle, hostname)
254 } else if input.handle.contains('.') {
255 validated_short_handle.clone()
256 } else {
257 format!("{}.{}", validated_short_handle, hostname)
258 };
259 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
260 if let Some(signing_key_did) = &input.signing_key {
261 let reserved = sqlx::query!(
262 r#"
263 SELECT id, private_key_bytes
264 FROM reserved_signing_keys
265 WHERE public_key_did_key = $1
266 AND used_at IS NULL
267 AND expires_at > NOW()
268 FOR UPDATE
269 "#,
270 signing_key_did
271 )
272 .fetch_optional(&state.db)
273 .await;
274 match reserved {
275 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
276 Ok(None) => {
277 return (
278 StatusCode::BAD_REQUEST,
279 Json(json!({
280 "error": "InvalidSigningKey",
281 "message": "Signing key not found, already used, or expired"
282 })),
283 )
284 .into_response();
285 }
286 Err(e) => {
287 error!("Error looking up reserved signing key: {:?}", e);
288 return (
289 StatusCode::INTERNAL_SERVER_ERROR,
290 Json(json!({"error": "InternalError"})),
291 )
292 .into_response();
293 }
294 }
295 } else {
296 let secret_key = SecretKey::random(&mut OsRng);
297 (secret_key.to_bytes().to_vec(), None)
298 };
299 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
300 Ok(k) => k,
301 Err(e) => {
302 error!("Error creating signing key: {:?}", e);
303 return (
304 StatusCode::INTERNAL_SERVER_ERROR,
305 Json(json!({"error": "InternalError"})),
306 )
307 .into_response();
308 }
309 };
310 let did_type = input.did_type.as_deref().unwrap_or("plc");
311 let did = match did_type {
312 "web" => {
313 let subdomain_host = format!("{}.{}", input.handle, hostname);
314 let encoded_subdomain = subdomain_host.replace(':', "%3A");
315 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
316 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
317 self_hosted_did
318 }
319 "web-external" => {
320 let d = match &input.did {
321 Some(d) if !d.trim().is_empty() => d,
322 _ => {
323 return (
324 StatusCode::BAD_REQUEST,
325 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
326 )
327 .into_response();
328 }
329 };
330 if !d.starts_with("did:web:") {
331 return (
332 StatusCode::BAD_REQUEST,
333 Json(
334 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
335 ),
336 )
337 .into_response();
338 }
339 if let Err(e) =
340 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
341 {
342 return (
343 StatusCode::BAD_REQUEST,
344 Json(json!({"error": "InvalidDid", "message": e})),
345 )
346 .into_response();
347 }
348 info!(did = %d, "Creating external did:web account");
349 d.clone()
350 }
351 _ => {
352 if let Some(d) = &input.did {
353 if d.starts_with("did:plc:") && is_migration {
354 info!(did = %d, "Migration with existing did:plc");
355 d.clone()
356 } else if d.starts_with("did:web:") {
357 if let Err(e) =
358 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref())
359 .await
360 {
361 return (
362 StatusCode::BAD_REQUEST,
363 Json(json!({"error": "InvalidDid", "message": e})),
364 )
365 .into_response();
366 }
367 d.clone()
368 } else if !d.trim().is_empty() {
369 return (
370 StatusCode::BAD_REQUEST,
371 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
372 )
373 .into_response();
374 } else {
375 let rotation_key = std::env::var("PLC_ROTATION_KEY")
376 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
377 let genesis_result = match create_genesis_operation(
378 &signing_key,
379 &rotation_key,
380 &handle,
381 &pds_endpoint,
382 ) {
383 Ok(r) => r,
384 Err(e) => {
385 error!("Error creating PLC genesis operation: {:?}", e);
386 return (
387 StatusCode::INTERNAL_SERVER_ERROR,
388 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
389 )
390 .into_response();
391 }
392 };
393 let plc_client = PlcClient::new(None);
394 if let Err(e) = plc_client
395 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
396 .await
397 {
398 error!("Failed to submit PLC genesis operation: {:?}", e);
399 return (
400 StatusCode::BAD_GATEWAY,
401 Json(json!({
402 "error": "UpstreamError",
403 "message": format!("Failed to register DID with PLC directory: {}", e)
404 })),
405 )
406 .into_response();
407 }
408 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
409 genesis_result.did
410 }
411 } else {
412 let rotation_key = std::env::var("PLC_ROTATION_KEY")
413 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
414 let genesis_result = match create_genesis_operation(
415 &signing_key,
416 &rotation_key,
417 &handle,
418 &pds_endpoint,
419 ) {
420 Ok(r) => r,
421 Err(e) => {
422 error!("Error creating PLC genesis operation: {:?}", e);
423 return (
424 StatusCode::INTERNAL_SERVER_ERROR,
425 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
426 )
427 .into_response();
428 }
429 };
430 let plc_client = PlcClient::new(None);
431 if let Err(e) = plc_client
432 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
433 .await
434 {
435 error!("Failed to submit PLC genesis operation: {:?}", e);
436 return (
437 StatusCode::BAD_GATEWAY,
438 Json(json!({
439 "error": "UpstreamError",
440 "message": format!("Failed to register DID with PLC directory: {}", e)
441 })),
442 )
443 .into_response();
444 }
445 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
446 genesis_result.did
447 }
448 }
449 };
450 let mut tx = match state.db.begin().await {
451 Ok(tx) => tx,
452 Err(e) => {
453 error!("Error starting transaction: {:?}", e);
454 return (
455 StatusCode::INTERNAL_SERVER_ERROR,
456 Json(json!({"error": "InternalError"})),
457 )
458 .into_response();
459 }
460 };
461 if is_migration {
462 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
463 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
464 .bind(&did)
465 .fetch_optional(&mut *tx)
466 .await
467 .unwrap_or(None);
468 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
469 if deactivated_at.is_some() {
470 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
471 let update_result: Result<_, sqlx::Error> =
472 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
473 .bind(&handle)
474 .bind(account_id)
475 .execute(&mut *tx)
476 .await;
477 if let Err(e) = update_result {
478 if let Some(db_err) = e.as_database_error()
479 && db_err
480 .constraint()
481 .map(|c| c.contains("handle"))
482 .unwrap_or(false)
483 {
484 return (
485 StatusCode::BAD_REQUEST,
486 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
487 )
488 .into_response();
489 }
490 error!("Error reactivating account: {:?}", e);
491 return (
492 StatusCode::INTERNAL_SERVER_ERROR,
493 Json(json!({"error": "InternalError"})),
494 )
495 .into_response();
496 }
497 if let Err(e) = tx.commit().await {
498 error!("Error committing reactivation: {:?}", e);
499 return (
500 StatusCode::INTERNAL_SERVER_ERROR,
501 Json(json!({"error": "InternalError"})),
502 )
503 .into_response();
504 }
505 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
506 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
507 )
508 .bind(account_id)
509 .fetch_optional(&state.db)
510 .await
511 .unwrap_or(None);
512 let secret_key_bytes = match key_row {
513 Some((key_bytes, encryption_version)) => {
514 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
515 Ok(k) => k,
516 Err(e) => {
517 error!("Error decrypting key for reactivated account: {:?}", e);
518 return (
519 StatusCode::INTERNAL_SERVER_ERROR,
520 Json(json!({"error": "InternalError"})),
521 )
522 .into_response();
523 }
524 }
525 }
526 None => {
527 error!("No signing key found for reactivated account");
528 return (
529 StatusCode::INTERNAL_SERVER_ERROR,
530 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
531 )
532 .into_response();
533 }
534 };
535 let access_meta =
536 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
537 Ok(m) => m,
538 Err(e) => {
539 error!("Error creating access token: {:?}", e);
540 return (
541 StatusCode::INTERNAL_SERVER_ERROR,
542 Json(json!({"error": "InternalError"})),
543 )
544 .into_response();
545 }
546 };
547 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
548 &did,
549 &secret_key_bytes,
550 ) {
551 Ok(m) => m,
552 Err(e) => {
553 error!("Error creating refresh token: {:?}", e);
554 return (
555 StatusCode::INTERNAL_SERVER_ERROR,
556 Json(json!({"error": "InternalError"})),
557 )
558 .into_response();
559 }
560 };
561 let session_result: Result<_, sqlx::Error> = sqlx::query(
562 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
563 )
564 .bind(&did)
565 .bind(&access_meta.jti)
566 .bind(&refresh_meta.jti)
567 .bind(access_meta.expires_at)
568 .bind(refresh_meta.expires_at)
569 .execute(&state.db)
570 .await;
571 if let Err(e) = session_result {
572 error!("Error creating session: {:?}", e);
573 return (
574 StatusCode::INTERNAL_SERVER_ERROR,
575 Json(json!({"error": "InternalError"})),
576 )
577 .into_response();
578 }
579 return (
580 StatusCode::OK,
581 Json(CreateAccountOutput {
582 handle: handle.clone(),
583 did,
584 access_jwt: Some(access_meta.token),
585 refresh_jwt: Some(refresh_meta.token),
586 verification_required: false,
587 verification_channel: "email".to_string(),
588 }),
589 )
590 .into_response();
591 } else {
592 return (
593 StatusCode::BAD_REQUEST,
594 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
595 )
596 .into_response();
597 }
598 }
599 }
600 let exists_result: Option<(i32,)> =
601 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
602 .bind(&handle)
603 .fetch_optional(&mut *tx)
604 .await
605 .unwrap_or(None);
606 if exists_result.is_some() {
607 return (
608 StatusCode::BAD_REQUEST,
609 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
610 )
611 .into_response();
612 }
613 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
614 .map(|v| v == "true" || v == "1")
615 .unwrap_or(false);
616 if invite_code_required
617 && input
618 .invite_code
619 .as_ref()
620 .map(|c| c.trim().is_empty())
621 .unwrap_or(true)
622 {
623 return (
624 StatusCode::BAD_REQUEST,
625 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
626 )
627 .into_response();
628 }
629 if let Some(code) = &input.invite_code
630 && !code.trim().is_empty()
631 {
632 let invite_query = sqlx::query!(
633 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
634 code
635 )
636 .fetch_optional(&mut *tx)
637 .await;
638 match invite_query {
639 Ok(Some(row)) => {
640 if row.available_uses <= 0 {
641 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
642 }
643 let update_invite = sqlx::query!(
644 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
645 code
646 )
647 .execute(&mut *tx)
648 .await;
649 if let Err(e) = update_invite {
650 error!("Error updating invite code: {:?}", e);
651 return (
652 StatusCode::INTERNAL_SERVER_ERROR,
653 Json(json!({"error": "InternalError"})),
654 )
655 .into_response();
656 }
657 }
658 Ok(None) => {
659 return (
660 StatusCode::BAD_REQUEST,
661 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
662 )
663 .into_response();
664 }
665 Err(e) => {
666 error!("Error checking invite code: {:?}", e);
667 return (
668 StatusCode::INTERNAL_SERVER_ERROR,
669 Json(json!({"error": "InternalError"})),
670 )
671 .into_response();
672 }
673 }
674 }
675 if let Err(e) = validate_password(&input.password) {
676 return (
677 StatusCode::BAD_REQUEST,
678 Json(json!({
679 "error": "InvalidPassword",
680 "message": e.to_string()
681 })),
682 )
683 .into_response();
684 }
685
686 let password_hash = match hash(&input.password, DEFAULT_COST) {
687 Ok(h) => h,
688 Err(e) => {
689 error!("Error hashing password: {:?}", e);
690 return (
691 StatusCode::INTERNAL_SERVER_ERROR,
692 Json(json!({"error": "InternalError"})),
693 )
694 .into_response();
695 }
696 };
697 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
698 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30);
699 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
700 .fetch_one(&mut *tx)
701 .await
702 .map(|c| c.unwrap_or(0) == 0)
703 .unwrap_or(false);
704 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration {
705 Some(chrono::Utc::now())
706 } else {
707 None
708 };
709 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
710 r#"INSERT INTO users (
711 handle, email, did, password_hash,
712 preferred_comms_channel,
713 discord_id, telegram_username, signal_number,
714 is_admin, deactivated_at, email_verified
715 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
716 )
717 .bind(&handle)
718 .bind(&email)
719 .bind(&did)
720 .bind(&password_hash)
721 .bind(verification_channel)
722 .bind(
723 input
724 .discord_id
725 .as_deref()
726 .map(|s| s.trim())
727 .filter(|s| !s.is_empty()),
728 )
729 .bind(
730 input
731 .telegram_username
732 .as_deref()
733 .map(|s| s.trim())
734 .filter(|s| !s.is_empty()),
735 )
736 .bind(
737 input
738 .signal_number
739 .as_deref()
740 .map(|s| s.trim())
741 .filter(|s| !s.is_empty()),
742 )
743 .bind(is_first_user)
744 .bind(deactivated_at)
745 .bind(is_migration)
746 .fetch_one(&mut *tx)
747 .await;
748 let user_id = match user_insert {
749 Ok((id,)) => id,
750 Err(e) => {
751 if let Some(db_err) = e.as_database_error()
752 && db_err.code().as_deref() == Some("23505")
753 {
754 let constraint = db_err.constraint().unwrap_or("");
755 if constraint.contains("handle") || constraint.contains("users_handle") {
756 return (
757 StatusCode::BAD_REQUEST,
758 Json(json!({
759 "error": "HandleNotAvailable",
760 "message": "Handle already taken"
761 })),
762 )
763 .into_response();
764 } else if constraint.contains("email") || constraint.contains("users_email") {
765 return (
766 StatusCode::BAD_REQUEST,
767 Json(json!({
768 "error": "InvalidEmail",
769 "message": "Email already registered"
770 })),
771 )
772 .into_response();
773 } else if constraint.contains("did") || constraint.contains("users_did") {
774 return (
775 StatusCode::BAD_REQUEST,
776 Json(json!({
777 "error": "AccountAlreadyExists",
778 "message": "An account with this DID already exists"
779 })),
780 )
781 .into_response();
782 }
783 }
784 error!("Error inserting user: {:?}", e);
785 return (
786 StatusCode::INTERNAL_SERVER_ERROR,
787 Json(json!({"error": "InternalError"})),
788 )
789 .into_response();
790 }
791 };
792
793 if !is_migration
794 && let Some(ref recipient) = verification_recipient
795 && let Err(e) = sqlx::query!(
796 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, $2::comms_channel, $3, $4, $5)",
797 user_id,
798 verification_channel as _,
799 verification_code,
800 recipient,
801 code_expires_at
802 )
803 .execute(&mut *tx)
804 .await {
805 error!("Error inserting verification code: {:?}", e);
806 return (
807 StatusCode::INTERNAL_SERVER_ERROR,
808 Json(json!({"error": "InternalError"})),
809 )
810 .into_response();
811 }
812 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
813 Ok(enc) => enc,
814 Err(e) => {
815 error!("Error encrypting user key: {:?}", e);
816 return (
817 StatusCode::INTERNAL_SERVER_ERROR,
818 Json(json!({"error": "InternalError"})),
819 )
820 .into_response();
821 }
822 };
823 let key_insert = sqlx::query!(
824 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
825 user_id,
826 &encrypted_key_bytes[..],
827 crate::config::ENCRYPTION_VERSION
828 )
829 .execute(&mut *tx)
830 .await;
831 if let Err(e) = key_insert {
832 error!("Error inserting user key: {:?}", e);
833 return (
834 StatusCode::INTERNAL_SERVER_ERROR,
835 Json(json!({"error": "InternalError"})),
836 )
837 .into_response();
838 }
839 if let Some(key_id) = reserved_key_id {
840 let mark_used = sqlx::query!(
841 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
842 key_id
843 )
844 .execute(&mut *tx)
845 .await;
846 if let Err(e) = mark_used {
847 error!("Error marking reserved key as used: {:?}", e);
848 return (
849 StatusCode::INTERNAL_SERVER_ERROR,
850 Json(json!({"error": "InternalError"})),
851 )
852 .into_response();
853 }
854 }
855 let mst = Mst::new(Arc::new(state.block_store.clone()));
856 let mst_root = match mst.persist().await {
857 Ok(c) => c,
858 Err(e) => {
859 error!("Error persisting MST: {:?}", e);
860 return (
861 StatusCode::INTERNAL_SERVER_ERROR,
862 Json(json!({"error": "InternalError"})),
863 )
864 .into_response();
865 }
866 };
867 let did_obj = match Did::new(&did) {
868 Ok(d) => d,
869 Err(_) => {
870 return (
871 StatusCode::INTERNAL_SERVER_ERROR,
872 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
873 )
874 .into_response();
875 }
876 };
877 let rev = Tid::now(LimitedU32::MIN);
878 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None);
879 let signed_commit = match unsigned_commit.sign(&signing_key) {
880 Ok(c) => c,
881 Err(e) => {
882 error!("Error signing genesis commit: {:?}", e);
883 return (
884 StatusCode::INTERNAL_SERVER_ERROR,
885 Json(json!({"error": "InternalError"})),
886 )
887 .into_response();
888 }
889 };
890 let commit_bytes = match signed_commit.to_cbor() {
891 Ok(b) => b,
892 Err(e) => {
893 error!("Error serializing genesis commit: {:?}", e);
894 return (
895 StatusCode::INTERNAL_SERVER_ERROR,
896 Json(json!({"error": "InternalError"})),
897 )
898 .into_response();
899 }
900 };
901 let commit_cid = match state.block_store.put(&commit_bytes).await {
902 Ok(c) => c,
903 Err(e) => {
904 error!("Error saving genesis commit: {:?}", e);
905 return (
906 StatusCode::INTERNAL_SERVER_ERROR,
907 Json(json!({"error": "InternalError"})),
908 )
909 .into_response();
910 }
911 };
912 let commit_cid_str = commit_cid.to_string();
913 let repo_insert = sqlx::query!(
914 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
915 user_id,
916 commit_cid_str
917 )
918 .execute(&mut *tx)
919 .await;
920 if let Err(e) = repo_insert {
921 error!("Error initializing repo: {:?}", e);
922 return (
923 StatusCode::INTERNAL_SERVER_ERROR,
924 Json(json!({"error": "InternalError"})),
925 )
926 .into_response();
927 }
928 if let Some(code) = &input.invite_code
929 && !code.trim().is_empty()
930 {
931 let use_insert = sqlx::query!(
932 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
933 code,
934 user_id
935 )
936 .execute(&mut *tx)
937 .await;
938 if let Err(e) = use_insert {
939 error!("Error recording invite usage: {:?}", e);
940 return (
941 StatusCode::INTERNAL_SERVER_ERROR,
942 Json(json!({"error": "InternalError"})),
943 )
944 .into_response();
945 }
946 }
947 if let Err(e) = tx.commit().await {
948 error!("Error committing transaction: {:?}", e);
949 return (
950 StatusCode::INTERNAL_SERVER_ERROR,
951 Json(json!({"error": "InternalError"})),
952 )
953 .into_response();
954 }
955 if !is_migration {
956 if let Err(e) =
957 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
958 {
959 warn!("Failed to sequence identity event for {}: {}", did, e);
960 }
961 if let Err(e) =
962 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
963 {
964 warn!("Failed to sequence account event for {}: {}", did, e);
965 }
966 let profile_record = json!({
967 "$type": "app.bsky.actor.profile",
968 "displayName": input.handle
969 });
970 if let Err(e) = crate::api::repo::record::create_record_internal(
971 &state,
972 &did,
973 "app.bsky.actor.profile",
974 "self",
975 &profile_record,
976 )
977 .await
978 {
979 warn!("Failed to create default profile for {}: {}", did, e);
980 }
981 if let Some(ref recipient) = verification_recipient
982 && let Err(e) = crate::comms::enqueue_signup_verification(
983 &state.db,
984 user_id,
985 verification_channel,
986 recipient,
987 &verification_code,
988 )
989 .await
990 {
991 warn!(
992 "Failed to enqueue signup verification notification: {:?}",
993 e
994 );
995 }
996 }
997
998 let (access_jwt, refresh_jwt) = if is_migration {
999 let access_meta =
1000 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
1001 Ok(m) => m,
1002 Err(e) => {
1003 error!("Error creating access token for migration: {:?}", e);
1004 return (
1005 StatusCode::INTERNAL_SERVER_ERROR,
1006 Json(json!({"error": "InternalError"})),
1007 )
1008 .into_response();
1009 }
1010 };
1011 let refresh_meta =
1012 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1013 Ok(m) => m,
1014 Err(e) => {
1015 error!("Error creating refresh token for migration: {:?}", e);
1016 return (
1017 StatusCode::INTERNAL_SERVER_ERROR,
1018 Json(json!({"error": "InternalError"})),
1019 )
1020 .into_response();
1021 }
1022 };
1023 if let Err(e) = sqlx::query!(
1024 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1025 did,
1026 access_meta.jti,
1027 refresh_meta.jti,
1028 access_meta.expires_at,
1029 refresh_meta.expires_at
1030 )
1031 .execute(&state.db)
1032 .await
1033 {
1034 error!("Error creating session for migration: {:?}", e);
1035 return (
1036 StatusCode::INTERNAL_SERVER_ERROR,
1037 Json(json!({"error": "InternalError"})),
1038 )
1039 .into_response();
1040 }
1041 (Some(access_meta.token), Some(refresh_meta.token))
1042 } else {
1043 (None, None)
1044 };
1045
1046 (
1047 StatusCode::OK,
1048 Json(CreateAccountOutput {
1049 handle: handle.clone(),
1050 did,
1051 access_jwt,
1052 refresh_jwt,
1053 verification_required: !is_migration,
1054 verification_channel: verification_channel.to_string(),
1055 }),
1056 )
1057 .into_response()
1058}