this repo has no description
1use super::did::verify_did_web;
2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
4use crate::state::{AppState, RateLimitKind};
5use axum::{
6 Json,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use bcrypt::{DEFAULT_COST, hash};
12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use k256::{SecretKey, ecdsa::SigningKey};
15use rand::rngs::OsRng;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::sync::Arc;
19use tracing::{debug, error, info, warn};
20
21fn extract_client_ip(headers: &HeaderMap) -> String {
22 if let Some(forwarded) = headers.get("x-forwarded-for")
23 && let Ok(value) = forwarded.to_str()
24 && let Some(first_ip) = value.split(',').next()
25 {
26 return first_ip.trim().to_string();
27 }
28 if let Some(real_ip) = headers.get("x-real-ip")
29 && let Ok(value) = real_ip.to_str()
30 {
31 return value.trim().to_string();
32 }
33 "unknown".to_string()
34}
35
36#[derive(Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct CreateAccountInput {
39 pub handle: String,
40 pub email: Option<String>,
41 pub password: String,
42 pub invite_code: Option<String>,
43 pub did: Option<String>,
44 pub did_type: Option<String>,
45 pub signing_key: Option<String>,
46 pub verification_channel: Option<String>,
47 pub discord_id: Option<String>,
48 pub telegram_username: Option<String>,
49 pub signal_number: Option<String>,
50}
51
52#[derive(Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct CreateAccountOutput {
55 pub handle: String,
56 pub did: String,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub access_jwt: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub refresh_jwt: Option<String>,
61 pub verification_required: bool,
62 pub verification_channel: String,
63}
64
65pub async fn create_account(
66 State(state): State<AppState>,
67 headers: HeaderMap,
68 Json(input): Json<CreateAccountInput>,
69) -> Response {
70 info!("create_account called");
71 let client_ip = extract_client_ip(&headers);
72 if !state
73 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
74 .await
75 {
76 warn!(ip = %client_ip, "Account creation rate limit exceeded");
77 return (
78 StatusCode::TOO_MANY_REQUESTS,
79 Json(json!({
80 "error": "RateLimitExceeded",
81 "message": "Too many account creation attempts. Please try again later."
82 })),
83 )
84 .into_response();
85 }
86
87 let migration_auth = if let Some(token) =
88 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
89 {
90 if is_service_token(&token) {
91 let verifier = ServiceTokenVerifier::new();
92 match verifier
93 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
94 .await
95 {
96 Ok(claims) => {
97 debug!("Service token verified for migration: iss={}", claims.iss);
98 Some(claims.iss)
99 }
100 Err(e) => {
101 error!("Service token verification failed: {:?}", e);
102 return (
103 StatusCode::UNAUTHORIZED,
104 Json(json!({
105 "error": "AuthenticationFailed",
106 "message": format!("Service token verification failed: {}", e)
107 })),
108 )
109 .into_response();
110 }
111 }
112 } else {
113 None
114 }
115 } else {
116 None
117 };
118
119 let is_migration = migration_auth.is_some()
120 && input
121 .did
122 .as_ref()
123 .map(|d| d.starts_with("did:plc:"))
124 .unwrap_or(false);
125
126 if is_migration {
127 let migration_did = input.did.as_ref().unwrap();
128 let auth_did = migration_auth.as_ref().unwrap();
129 if migration_did != auth_did {
130 return (
131 StatusCode::FORBIDDEN,
132 Json(json!({
133 "error": "AuthorizationError",
134 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did)
135 })),
136 )
137 .into_response();
138 }
139 info!(did = %migration_did, "Processing account migration");
140 }
141
142 let hostname_for_validation = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
143 let pds_suffix = format!(".{}", hostname_for_validation);
144
145 let validated_short_handle = if !input.handle.contains('.') || input.handle.ends_with(&pds_suffix) {
146 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
147 input.handle.strip_suffix(&pds_suffix).unwrap_or(&input.handle)
148 } else {
149 &input.handle
150 };
151 match crate::api::validation::validate_short_handle(handle_to_validate) {
152 Ok(h) => h,
153 Err(e) => {
154 return (
155 StatusCode::BAD_REQUEST,
156 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
157 )
158 .into_response();
159 }
160 }
161 } else {
162 if input.handle.contains(' ') || input.handle.contains('\t') {
163 return (
164 StatusCode::BAD_REQUEST,
165 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
166 )
167 .into_response();
168 }
169 input.handle.to_lowercase()
170 };
171 let email: Option<String> = input
172 .email
173 .as_ref()
174 .map(|e| e.trim().to_string())
175 .filter(|e| !e.is_empty());
176 if let Some(ref email) = email
177 && !crate::api::validation::is_valid_email(email)
178 {
179 return (
180 StatusCode::BAD_REQUEST,
181 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
182 )
183 .into_response();
184 }
185 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
186 let valid_channels = ["email", "discord", "telegram", "signal"];
187 if !valid_channels.contains(&verification_channel) && !is_migration {
188 return (
189 StatusCode::BAD_REQUEST,
190 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
191 )
192 .into_response();
193 }
194 let verification_recipient = if is_migration {
195 None
196 } else {
197 Some(match verification_channel {
198 "email" => match &input.email {
199 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
200 _ => return (
201 StatusCode::BAD_REQUEST,
202 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
203 ).into_response(),
204 },
205 "discord" => match &input.discord_id {
206 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
207 _ => return (
208 StatusCode::BAD_REQUEST,
209 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
210 ).into_response(),
211 },
212 "telegram" => match &input.telegram_username {
213 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
214 _ => return (
215 StatusCode::BAD_REQUEST,
216 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
217 ).into_response(),
218 },
219 "signal" => match &input.signal_number {
220 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
221 _ => return (
222 StatusCode::BAD_REQUEST,
223 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
224 ).into_response(),
225 },
226 _ => return (
227 StatusCode::BAD_REQUEST,
228 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
229 ).into_response(),
230 })
231 };
232 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
233 let pds_endpoint = format!("https://{}", hostname);
234 let suffix = format!(".{}", hostname);
235 let handle = if input.handle.ends_with(&suffix) {
236 format!("{}.{}", validated_short_handle, hostname)
237 } else if input.handle.contains('.') {
238 validated_short_handle.clone()
239 } else {
240 format!("{}.{}", validated_short_handle, hostname)
241 };
242 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
243 if let Some(signing_key_did) = &input.signing_key {
244 let reserved = sqlx::query!(
245 r#"
246 SELECT id, private_key_bytes
247 FROM reserved_signing_keys
248 WHERE public_key_did_key = $1
249 AND used_at IS NULL
250 AND expires_at > NOW()
251 FOR UPDATE
252 "#,
253 signing_key_did
254 )
255 .fetch_optional(&state.db)
256 .await;
257 match reserved {
258 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
259 Ok(None) => {
260 return (
261 StatusCode::BAD_REQUEST,
262 Json(json!({
263 "error": "InvalidSigningKey",
264 "message": "Signing key not found, already used, or expired"
265 })),
266 )
267 .into_response();
268 }
269 Err(e) => {
270 error!("Error looking up reserved signing key: {:?}", e);
271 return (
272 StatusCode::INTERNAL_SERVER_ERROR,
273 Json(json!({"error": "InternalError"})),
274 )
275 .into_response();
276 }
277 }
278 } else {
279 let secret_key = SecretKey::random(&mut OsRng);
280 (secret_key.to_bytes().to_vec(), None)
281 };
282 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
283 Ok(k) => k,
284 Err(e) => {
285 error!("Error creating signing key: {:?}", e);
286 return (
287 StatusCode::INTERNAL_SERVER_ERROR,
288 Json(json!({"error": "InternalError"})),
289 )
290 .into_response();
291 }
292 };
293 let did_type = input.did_type.as_deref().unwrap_or("plc");
294 let did = match did_type {
295 "web" => {
296 let subdomain_host = format!("{}.{}", input.handle, hostname);
297 let encoded_subdomain = subdomain_host.replace(':', "%3A");
298 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
299 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
300 self_hosted_did
301 }
302 "web-external" => {
303 let d = match &input.did {
304 Some(d) if !d.trim().is_empty() => d,
305 _ => {
306 return (
307 StatusCode::BAD_REQUEST,
308 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
309 )
310 .into_response();
311 }
312 };
313 if !d.starts_with("did:web:") {
314 return (
315 StatusCode::BAD_REQUEST,
316 Json(
317 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
318 ),
319 )
320 .into_response();
321 }
322 if let Err(e) = verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await {
323 return (
324 StatusCode::BAD_REQUEST,
325 Json(json!({"error": "InvalidDid", "message": e})),
326 )
327 .into_response();
328 }
329 info!(did = %d, "Creating external did:web account");
330 d.clone()
331 }
332 _ => {
333 if let Some(d) = &input.did {
334 if d.starts_with("did:plc:") && is_migration {
335 info!(did = %d, "Migration with existing did:plc");
336 d.clone()
337 } else if d.starts_with("did:web:") {
338 if let Err(e) = verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await {
339 return (
340 StatusCode::BAD_REQUEST,
341 Json(json!({"error": "InvalidDid", "message": e})),
342 )
343 .into_response();
344 }
345 d.clone()
346 } else if !d.trim().is_empty() {
347 return (
348 StatusCode::BAD_REQUEST,
349 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
350 )
351 .into_response();
352 } else {
353 let rotation_key = std::env::var("PLC_ROTATION_KEY")
354 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
355 let genesis_result = match create_genesis_operation(
356 &signing_key,
357 &rotation_key,
358 &handle,
359 &pds_endpoint,
360 ) {
361 Ok(r) => r,
362 Err(e) => {
363 error!("Error creating PLC genesis operation: {:?}", e);
364 return (
365 StatusCode::INTERNAL_SERVER_ERROR,
366 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
367 )
368 .into_response();
369 }
370 };
371 let plc_client = PlcClient::new(None);
372 if let Err(e) = plc_client
373 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
374 .await
375 {
376 error!("Failed to submit PLC genesis operation: {:?}", e);
377 return (
378 StatusCode::BAD_GATEWAY,
379 Json(json!({
380 "error": "UpstreamError",
381 "message": format!("Failed to register DID with PLC directory: {}", e)
382 })),
383 )
384 .into_response();
385 }
386 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
387 genesis_result.did
388 }
389 } else {
390 let rotation_key = std::env::var("PLC_ROTATION_KEY")
391 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
392 let genesis_result = match create_genesis_operation(
393 &signing_key,
394 &rotation_key,
395 &handle,
396 &pds_endpoint,
397 ) {
398 Ok(r) => r,
399 Err(e) => {
400 error!("Error creating PLC genesis operation: {:?}", e);
401 return (
402 StatusCode::INTERNAL_SERVER_ERROR,
403 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
404 )
405 .into_response();
406 }
407 };
408 let plc_client = PlcClient::new(None);
409 if let Err(e) = plc_client
410 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
411 .await
412 {
413 error!("Failed to submit PLC genesis operation: {:?}", e);
414 return (
415 StatusCode::BAD_GATEWAY,
416 Json(json!({
417 "error": "UpstreamError",
418 "message": format!("Failed to register DID with PLC directory: {}", e)
419 })),
420 )
421 .into_response();
422 }
423 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
424 genesis_result.did
425 }
426 }
427 };
428 let mut tx = match state.db.begin().await {
429 Ok(tx) => tx,
430 Err(e) => {
431 error!("Error starting transaction: {:?}", e);
432 return (
433 StatusCode::INTERNAL_SERVER_ERROR,
434 Json(json!({"error": "InternalError"})),
435 )
436 .into_response();
437 }
438 };
439 if is_migration {
440 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
441 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
442 .bind(&did)
443 .fetch_optional(&mut *tx)
444 .await
445 .unwrap_or(None);
446 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
447 if deactivated_at.is_some() {
448 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
449 let update_result: Result<_, sqlx::Error> =
450 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
451 .bind(&handle)
452 .bind(account_id)
453 .execute(&mut *tx)
454 .await;
455 if let Err(e) = update_result {
456 if let Some(db_err) = e.as_database_error()
457 && db_err
458 .constraint()
459 .map(|c| c.contains("handle"))
460 .unwrap_or(false)
461 {
462 return (
463 StatusCode::BAD_REQUEST,
464 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
465 )
466 .into_response();
467 }
468 error!("Error reactivating account: {:?}", e);
469 return (
470 StatusCode::INTERNAL_SERVER_ERROR,
471 Json(json!({"error": "InternalError"})),
472 )
473 .into_response();
474 }
475 if let Err(e) = tx.commit().await {
476 error!("Error committing reactivation: {:?}", e);
477 return (
478 StatusCode::INTERNAL_SERVER_ERROR,
479 Json(json!({"error": "InternalError"})),
480 )
481 .into_response();
482 }
483 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
484 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
485 )
486 .bind(account_id)
487 .fetch_optional(&state.db)
488 .await
489 .unwrap_or(None);
490 let secret_key_bytes = match key_row {
491 Some((key_bytes, encryption_version)) => {
492 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
493 Ok(k) => k,
494 Err(e) => {
495 error!("Error decrypting key for reactivated account: {:?}", e);
496 return (
497 StatusCode::INTERNAL_SERVER_ERROR,
498 Json(json!({"error": "InternalError"})),
499 )
500 .into_response();
501 }
502 }
503 }
504 None => {
505 error!("No signing key found for reactivated account");
506 return (
507 StatusCode::INTERNAL_SERVER_ERROR,
508 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
509 )
510 .into_response();
511 }
512 };
513 let access_meta =
514 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
515 Ok(m) => m,
516 Err(e) => {
517 error!("Error creating access token: {:?}", e);
518 return (
519 StatusCode::INTERNAL_SERVER_ERROR,
520 Json(json!({"error": "InternalError"})),
521 )
522 .into_response();
523 }
524 };
525 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
526 &did,
527 &secret_key_bytes,
528 ) {
529 Ok(m) => m,
530 Err(e) => {
531 error!("Error creating refresh token: {:?}", e);
532 return (
533 StatusCode::INTERNAL_SERVER_ERROR,
534 Json(json!({"error": "InternalError"})),
535 )
536 .into_response();
537 }
538 };
539 let session_result: Result<_, sqlx::Error> = sqlx::query(
540 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
541 )
542 .bind(&did)
543 .bind(&access_meta.jti)
544 .bind(&refresh_meta.jti)
545 .bind(access_meta.expires_at)
546 .bind(refresh_meta.expires_at)
547 .execute(&state.db)
548 .await;
549 if let Err(e) = session_result {
550 error!("Error creating session: {:?}", e);
551 return (
552 StatusCode::INTERNAL_SERVER_ERROR,
553 Json(json!({"error": "InternalError"})),
554 )
555 .into_response();
556 }
557 return (
558 StatusCode::OK,
559 Json(CreateAccountOutput {
560 handle: handle.clone(),
561 did,
562 access_jwt: Some(access_meta.token),
563 refresh_jwt: Some(refresh_meta.token),
564 verification_required: false,
565 verification_channel: "email".to_string(),
566 }),
567 )
568 .into_response();
569 } else {
570 return (
571 StatusCode::BAD_REQUEST,
572 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
573 )
574 .into_response();
575 }
576 }
577 }
578 let exists_result: Option<(i32,)> =
579 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
580 .bind(&handle)
581 .fetch_optional(&mut *tx)
582 .await
583 .unwrap_or(None);
584 if exists_result.is_some() {
585 return (
586 StatusCode::BAD_REQUEST,
587 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
588 )
589 .into_response();
590 }
591 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
592 .map(|v| v == "true" || v == "1")
593 .unwrap_or(false);
594 if invite_code_required
595 && input
596 .invite_code
597 .as_ref()
598 .map(|c| c.trim().is_empty())
599 .unwrap_or(true)
600 {
601 return (
602 StatusCode::BAD_REQUEST,
603 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
604 )
605 .into_response();
606 }
607 if let Some(code) = &input.invite_code
608 && !code.trim().is_empty()
609 {
610 let invite_query = sqlx::query!(
611 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
612 code
613 )
614 .fetch_optional(&mut *tx)
615 .await;
616 match invite_query {
617 Ok(Some(row)) => {
618 if row.available_uses <= 0 {
619 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
620 }
621 let update_invite = sqlx::query!(
622 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
623 code
624 )
625 .execute(&mut *tx)
626 .await;
627 if let Err(e) = update_invite {
628 error!("Error updating invite code: {:?}", e);
629 return (
630 StatusCode::INTERNAL_SERVER_ERROR,
631 Json(json!({"error": "InternalError"})),
632 )
633 .into_response();
634 }
635 }
636 Ok(None) => {
637 return (
638 StatusCode::BAD_REQUEST,
639 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
640 )
641 .into_response();
642 }
643 Err(e) => {
644 error!("Error checking invite code: {:?}", e);
645 return (
646 StatusCode::INTERNAL_SERVER_ERROR,
647 Json(json!({"error": "InternalError"})),
648 )
649 .into_response();
650 }
651 }
652 }
653 let password_hash = match hash(&input.password, DEFAULT_COST) {
654 Ok(h) => h,
655 Err(e) => {
656 error!("Error hashing password: {:?}", e);
657 return (
658 StatusCode::INTERNAL_SERVER_ERROR,
659 Json(json!({"error": "InternalError"})),
660 )
661 .into_response();
662 }
663 };
664 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
665 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30);
666 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
667 .fetch_one(&mut *tx)
668 .await
669 .map(|c| c.unwrap_or(0) == 0)
670 .unwrap_or(false);
671 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration {
672 Some(chrono::Utc::now())
673 } else {
674 None
675 };
676 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
677 r#"INSERT INTO users (
678 handle, email, did, password_hash,
679 preferred_comms_channel,
680 discord_id, telegram_username, signal_number,
681 is_admin, deactivated_at, email_verified
682 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
683 )
684 .bind(&handle)
685 .bind(&email)
686 .bind(&did)
687 .bind(&password_hash)
688 .bind(verification_channel)
689 .bind(
690 input
691 .discord_id
692 .as_deref()
693 .map(|s| s.trim())
694 .filter(|s| !s.is_empty()),
695 )
696 .bind(
697 input
698 .telegram_username
699 .as_deref()
700 .map(|s| s.trim())
701 .filter(|s| !s.is_empty()),
702 )
703 .bind(
704 input
705 .signal_number
706 .as_deref()
707 .map(|s| s.trim())
708 .filter(|s| !s.is_empty()),
709 )
710 .bind(is_first_user)
711 .bind(deactivated_at)
712 .bind(is_migration)
713 .fetch_one(&mut *tx)
714 .await;
715 let user_id = match user_insert {
716 Ok((id,)) => id,
717 Err(e) => {
718 if let Some(db_err) = e.as_database_error()
719 && db_err.code().as_deref() == Some("23505")
720 {
721 let constraint = db_err.constraint().unwrap_or("");
722 if constraint.contains("handle") || constraint.contains("users_handle") {
723 return (
724 StatusCode::BAD_REQUEST,
725 Json(json!({
726 "error": "HandleNotAvailable",
727 "message": "Handle already taken"
728 })),
729 )
730 .into_response();
731 } else if constraint.contains("email") || constraint.contains("users_email") {
732 return (
733 StatusCode::BAD_REQUEST,
734 Json(json!({
735 "error": "InvalidEmail",
736 "message": "Email already registered"
737 })),
738 )
739 .into_response();
740 } else if constraint.contains("did") || constraint.contains("users_did") {
741 return (
742 StatusCode::BAD_REQUEST,
743 Json(json!({
744 "error": "AccountAlreadyExists",
745 "message": "An account with this DID already exists"
746 })),
747 )
748 .into_response();
749 }
750 }
751 error!("Error inserting user: {:?}", e);
752 return (
753 StatusCode::INTERNAL_SERVER_ERROR,
754 Json(json!({"error": "InternalError"})),
755 )
756 .into_response();
757 }
758 };
759
760 if !is_migration
761 && let Err(e) = sqlx::query!(
762 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)",
763 user_id,
764 verification_code,
765 email,
766 code_expires_at
767 )
768 .execute(&mut *tx)
769 .await {
770 error!("Error inserting verification code: {:?}", e);
771 return (
772 StatusCode::INTERNAL_SERVER_ERROR,
773 Json(json!({"error": "InternalError"})),
774 )
775 .into_response();
776 }
777 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
778 Ok(enc) => enc,
779 Err(e) => {
780 error!("Error encrypting user key: {:?}", e);
781 return (
782 StatusCode::INTERNAL_SERVER_ERROR,
783 Json(json!({"error": "InternalError"})),
784 )
785 .into_response();
786 }
787 };
788 let key_insert = sqlx::query!(
789 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
790 user_id,
791 &encrypted_key_bytes[..],
792 crate::config::ENCRYPTION_VERSION
793 )
794 .execute(&mut *tx)
795 .await;
796 if let Err(e) = key_insert {
797 error!("Error inserting user key: {:?}", e);
798 return (
799 StatusCode::INTERNAL_SERVER_ERROR,
800 Json(json!({"error": "InternalError"})),
801 )
802 .into_response();
803 }
804 if let Some(key_id) = reserved_key_id {
805 let mark_used = sqlx::query!(
806 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
807 key_id
808 )
809 .execute(&mut *tx)
810 .await;
811 if let Err(e) = mark_used {
812 error!("Error marking reserved key as used: {:?}", e);
813 return (
814 StatusCode::INTERNAL_SERVER_ERROR,
815 Json(json!({"error": "InternalError"})),
816 )
817 .into_response();
818 }
819 }
820 let mst = Mst::new(Arc::new(state.block_store.clone()));
821 let mst_root = match mst.persist().await {
822 Ok(c) => c,
823 Err(e) => {
824 error!("Error persisting MST: {:?}", e);
825 return (
826 StatusCode::INTERNAL_SERVER_ERROR,
827 Json(json!({"error": "InternalError"})),
828 )
829 .into_response();
830 }
831 };
832 let did_obj = match Did::new(&did) {
833 Ok(d) => d,
834 Err(_) => {
835 return (
836 StatusCode::INTERNAL_SERVER_ERROR,
837 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
838 )
839 .into_response();
840 }
841 };
842 let rev = Tid::now(LimitedU32::MIN);
843 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None);
844 let signed_commit = match unsigned_commit.sign(&signing_key) {
845 Ok(c) => c,
846 Err(e) => {
847 error!("Error signing genesis commit: {:?}", e);
848 return (
849 StatusCode::INTERNAL_SERVER_ERROR,
850 Json(json!({"error": "InternalError"})),
851 )
852 .into_response();
853 }
854 };
855 let commit_bytes = match signed_commit.to_cbor() {
856 Ok(b) => b,
857 Err(e) => {
858 error!("Error serializing genesis commit: {:?}", e);
859 return (
860 StatusCode::INTERNAL_SERVER_ERROR,
861 Json(json!({"error": "InternalError"})),
862 )
863 .into_response();
864 }
865 };
866 let commit_cid = match state.block_store.put(&commit_bytes).await {
867 Ok(c) => c,
868 Err(e) => {
869 error!("Error saving genesis commit: {:?}", e);
870 return (
871 StatusCode::INTERNAL_SERVER_ERROR,
872 Json(json!({"error": "InternalError"})),
873 )
874 .into_response();
875 }
876 };
877 let commit_cid_str = commit_cid.to_string();
878 let repo_insert = sqlx::query!(
879 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
880 user_id,
881 commit_cid_str
882 )
883 .execute(&mut *tx)
884 .await;
885 if let Err(e) = repo_insert {
886 error!("Error initializing repo: {:?}", e);
887 return (
888 StatusCode::INTERNAL_SERVER_ERROR,
889 Json(json!({"error": "InternalError"})),
890 )
891 .into_response();
892 }
893 if let Some(code) = &input.invite_code
894 && !code.trim().is_empty()
895 {
896 let use_insert = sqlx::query!(
897 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
898 code,
899 user_id
900 )
901 .execute(&mut *tx)
902 .await;
903 if let Err(e) = use_insert {
904 error!("Error recording invite usage: {:?}", e);
905 return (
906 StatusCode::INTERNAL_SERVER_ERROR,
907 Json(json!({"error": "InternalError"})),
908 )
909 .into_response();
910 }
911 }
912 if let Err(e) = tx.commit().await {
913 error!("Error committing transaction: {:?}", e);
914 return (
915 StatusCode::INTERNAL_SERVER_ERROR,
916 Json(json!({"error": "InternalError"})),
917 )
918 .into_response();
919 }
920 if !is_migration {
921 if let Err(e) =
922 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle))
923 .await
924 {
925 warn!("Failed to sequence identity event for {}: {}", did, e);
926 }
927 if let Err(e) =
928 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
929 {
930 warn!("Failed to sequence account event for {}: {}", did, e);
931 }
932 let profile_record = json!({
933 "$type": "app.bsky.actor.profile",
934 "displayName": input.handle
935 });
936 if let Err(e) = crate::api::repo::record::create_record_internal(
937 &state,
938 &did,
939 "app.bsky.actor.profile",
940 "self",
941 &profile_record,
942 )
943 .await
944 {
945 warn!("Failed to create default profile for {}: {}", did, e);
946 }
947 if let Some(ref recipient) = verification_recipient
948 && let Err(e) = crate::comms::enqueue_signup_verification(
949 &state.db,
950 user_id,
951 verification_channel,
952 recipient,
953 &verification_code,
954 )
955 .await
956 {
957 warn!(
958 "Failed to enqueue signup verification notification: {:?}",
959 e
960 );
961 }
962 }
963
964 let (access_jwt, refresh_jwt) = if is_migration {
965 let access_meta =
966 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
967 Ok(m) => m,
968 Err(e) => {
969 error!("Error creating access token for migration: {:?}", e);
970 return (
971 StatusCode::INTERNAL_SERVER_ERROR,
972 Json(json!({"error": "InternalError"})),
973 )
974 .into_response();
975 }
976 };
977 let refresh_meta =
978 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
979 Ok(m) => m,
980 Err(e) => {
981 error!("Error creating refresh token for migration: {:?}", e);
982 return (
983 StatusCode::INTERNAL_SERVER_ERROR,
984 Json(json!({"error": "InternalError"})),
985 )
986 .into_response();
987 }
988 };
989 if let Err(e) = sqlx::query!(
990 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
991 did,
992 access_meta.jti,
993 refresh_meta.jti,
994 access_meta.expires_at,
995 refresh_meta.expires_at
996 )
997 .execute(&state.db)
998 .await
999 {
1000 error!("Error creating session for migration: {:?}", e);
1001 return (
1002 StatusCode::INTERNAL_SERVER_ERROR,
1003 Json(json!({"error": "InternalError"})),
1004 )
1005 .into_response();
1006 }
1007 (Some(access_meta.token), Some(refresh_meta.token))
1008 } else {
1009 (None, None)
1010 };
1011
1012 (
1013 StatusCode::OK,
1014 Json(CreateAccountOutput {
1015 handle: handle.clone(),
1016 did,
1017 access_jwt,
1018 refresh_jwt,
1019 verification_required: !is_migration,
1020 verification_channel: verification_channel.to_string(),
1021 }),
1022 )
1023 .into_response()
1024}