this repo has no description
1use super::did::verify_did_web;
2use crate::api::repo::record::utils::create_signed_commit;
3use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
4use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
5use crate::state::{AppState, RateLimitKind};
6use crate::validation::validate_password;
7use axum::{
8 Json,
9 extract::State,
10 http::{HeaderMap, StatusCode},
11 response::{IntoResponse, Response},
12};
13use bcrypt::{DEFAULT_COST, hash};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::{mst::Mst, storage::BlockStore};
16use k256::{SecretKey, ecdsa::SigningKey};
17use rand::rngs::OsRng;
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use std::sync::Arc;
21use tracing::{debug, error, info, warn};
22
23fn extract_client_ip(headers: &HeaderMap) -> String {
24 if let Some(forwarded) = headers.get("x-forwarded-for")
25 && let Ok(value) = forwarded.to_str()
26 && let Some(first_ip) = value.split(',').next()
27 {
28 return first_ip.trim().to_string();
29 }
30 if let Some(real_ip) = headers.get("x-real-ip")
31 && let Ok(value) = real_ip.to_str()
32 {
33 return value.trim().to_string();
34 }
35 "unknown".to_string()
36}
37
38#[derive(Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct CreateAccountInput {
41 pub handle: String,
42 pub email: Option<String>,
43 pub password: String,
44 pub invite_code: Option<String>,
45 pub did: Option<String>,
46 pub did_type: Option<String>,
47 pub signing_key: Option<String>,
48 pub verification_channel: Option<String>,
49 pub discord_id: Option<String>,
50 pub telegram_username: Option<String>,
51 pub signal_number: Option<String>,
52}
53
54#[derive(Serialize)]
55#[serde(rename_all = "camelCase")]
56pub struct CreateAccountOutput {
57 pub handle: String,
58 pub did: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub access_jwt: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub refresh_jwt: Option<String>,
63 pub verification_required: bool,
64 pub verification_channel: String,
65}
66
67pub async fn create_account(
68 State(state): State<AppState>,
69 headers: HeaderMap,
70 Json(input): Json<CreateAccountInput>,
71) -> Response {
72 info!("create_account called");
73 let client_ip = extract_client_ip(&headers);
74 if !state
75 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
76 .await
77 {
78 warn!(ip = %client_ip, "Account creation rate limit exceeded");
79 return (
80 StatusCode::TOO_MANY_REQUESTS,
81 Json(json!({
82 "error": "RateLimitExceeded",
83 "message": "Too many account creation attempts. Please try again later."
84 })),
85 )
86 .into_response();
87 }
88
89 let migration_auth = if let Some(token) =
90 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
91 {
92 if is_service_token(&token) {
93 let verifier = ServiceTokenVerifier::new();
94 match verifier
95 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
96 .await
97 {
98 Ok(claims) => {
99 debug!("Service token verified for migration: iss={}", claims.iss);
100 Some(claims.iss)
101 }
102 Err(e) => {
103 error!("Service token verification failed: {:?}", e);
104 return (
105 StatusCode::UNAUTHORIZED,
106 Json(json!({
107 "error": "AuthenticationFailed",
108 "message": format!("Service token verification failed: {}", e)
109 })),
110 )
111 .into_response();
112 }
113 }
114 } else {
115 None
116 }
117 } else {
118 None
119 };
120
121 let is_migration = migration_auth.is_some()
122 && input
123 .did
124 .as_ref()
125 .map(|d| d.starts_with("did:plc:") || d.starts_with("did:web:"))
126 .unwrap_or(false);
127
128 let is_did_web_byod = migration_auth.is_some()
129 && input
130 .did
131 .as_ref()
132 .map(|d| d.starts_with("did:web:"))
133 .unwrap_or(false);
134
135 if is_migration {
136 if let (Some(migration_did), Some(auth_did)) = (input.did.as_ref(), migration_auth.as_ref())
137 {
138 if migration_did != auth_did {
139 return (
140 StatusCode::FORBIDDEN,
141 Json(json!({
142 "error": "AuthorizationError",
143 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did)
144 })),
145 )
146 .into_response();
147 }
148 if is_did_web_byod {
149 info!(did = %migration_did, "Processing did:web BYOD account creation");
150 } else {
151 info!(did = %migration_did, "Processing account migration");
152 }
153 }
154 }
155
156 let hostname_for_validation =
157 std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
158 let pds_suffix = format!(".{}", hostname_for_validation);
159
160 let validated_short_handle = if !input.handle.contains('.')
161 || input.handle.ends_with(&pds_suffix)
162 {
163 let handle_to_validate = if input.handle.ends_with(&pds_suffix) {
164 input
165 .handle
166 .strip_suffix(&pds_suffix)
167 .unwrap_or(&input.handle)
168 } else {
169 &input.handle
170 };
171 match crate::api::validation::validate_short_handle(handle_to_validate) {
172 Ok(h) => h,
173 Err(e) => {
174 return (
175 StatusCode::BAD_REQUEST,
176 Json(json!({"error": "InvalidHandle", "message": e.to_string()})),
177 )
178 .into_response();
179 }
180 }
181 } else {
182 if input.handle.contains(' ') || input.handle.contains('\t') {
183 return (
184 StatusCode::BAD_REQUEST,
185 Json(json!({"error": "InvalidHandle", "message": "Handle cannot contain spaces"})),
186 )
187 .into_response();
188 }
189 for c in input.handle.chars() {
190 if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
191 return (
192 StatusCode::BAD_REQUEST,
193 Json(json!({"error": "InvalidHandle", "message": format!("Handle contains invalid character: {}", c)})),
194 )
195 .into_response();
196 }
197 }
198 input.handle.to_lowercase()
199 };
200 let email: Option<String> = input
201 .email
202 .as_ref()
203 .map(|e| e.trim().to_string())
204 .filter(|e| !e.is_empty());
205 if let Some(ref email) = email
206 && !crate::api::validation::is_valid_email(email)
207 {
208 return (
209 StatusCode::BAD_REQUEST,
210 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
211 )
212 .into_response();
213 }
214 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
215 let valid_channels = ["email", "discord", "telegram", "signal"];
216 if !valid_channels.contains(&verification_channel) && !is_migration {
217 return (
218 StatusCode::BAD_REQUEST,
219 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
220 )
221 .into_response();
222 }
223 let verification_recipient = if is_migration {
224 None
225 } else {
226 Some(match verification_channel {
227 "email" => match &input.email {
228 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
229 _ => return (
230 StatusCode::BAD_REQUEST,
231 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
232 ).into_response(),
233 },
234 "discord" => match &input.discord_id {
235 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
236 _ => return (
237 StatusCode::BAD_REQUEST,
238 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
239 ).into_response(),
240 },
241 "telegram" => match &input.telegram_username {
242 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
243 _ => return (
244 StatusCode::BAD_REQUEST,
245 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
246 ).into_response(),
247 },
248 "signal" => match &input.signal_number {
249 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
250 _ => return (
251 StatusCode::BAD_REQUEST,
252 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
253 ).into_response(),
254 },
255 _ => return (
256 StatusCode::BAD_REQUEST,
257 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
258 ).into_response(),
259 })
260 };
261 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
262 let pds_endpoint = format!("https://{}", hostname);
263 let suffix = format!(".{}", hostname);
264 let handle = if input.handle.ends_with(&suffix) {
265 format!("{}.{}", validated_short_handle, hostname)
266 } else if input.handle.contains('.') {
267 validated_short_handle.clone()
268 } else {
269 format!("{}.{}", validated_short_handle, hostname)
270 };
271 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
272 if let Some(signing_key_did) = &input.signing_key {
273 let reserved = sqlx::query!(
274 r#"
275 SELECT id, private_key_bytes
276 FROM reserved_signing_keys
277 WHERE public_key_did_key = $1
278 AND used_at IS NULL
279 AND expires_at > NOW()
280 FOR UPDATE
281 "#,
282 signing_key_did
283 )
284 .fetch_optional(&state.db)
285 .await;
286 match reserved {
287 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
288 Ok(None) => {
289 return (
290 StatusCode::BAD_REQUEST,
291 Json(json!({
292 "error": "InvalidSigningKey",
293 "message": "Signing key not found, already used, or expired"
294 })),
295 )
296 .into_response();
297 }
298 Err(e) => {
299 error!("Error looking up reserved signing key: {:?}", e);
300 return (
301 StatusCode::INTERNAL_SERVER_ERROR,
302 Json(json!({"error": "InternalError"})),
303 )
304 .into_response();
305 }
306 }
307 } else {
308 let secret_key = SecretKey::random(&mut OsRng);
309 (secret_key.to_bytes().to_vec(), None)
310 };
311 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
312 Ok(k) => k,
313 Err(e) => {
314 error!("Error creating signing key: {:?}", e);
315 return (
316 StatusCode::INTERNAL_SERVER_ERROR,
317 Json(json!({"error": "InternalError"})),
318 )
319 .into_response();
320 }
321 };
322 let did_type = input.did_type.as_deref().unwrap_or("plc");
323 let did = match did_type {
324 "web" => {
325 let subdomain_host = format!("{}.{}", input.handle, hostname);
326 let encoded_subdomain = subdomain_host.replace(':', "%3A");
327 let self_hosted_did = format!("did:web:{}", encoded_subdomain);
328 info!(did = %self_hosted_did, "Creating self-hosted did:web account (subdomain)");
329 self_hosted_did
330 }
331 "web-external" => {
332 let d = match &input.did {
333 Some(d) if !d.trim().is_empty() => d,
334 _ => {
335 return (
336 StatusCode::BAD_REQUEST,
337 Json(json!({"error": "InvalidRequest", "message": "External did:web requires the 'did' field to be provided"})),
338 )
339 .into_response();
340 }
341 };
342 if !d.starts_with("did:web:") {
343 return (
344 StatusCode::BAD_REQUEST,
345 Json(
346 json!({"error": "InvalidDid", "message": "External DID must be a did:web"}),
347 ),
348 )
349 .into_response();
350 }
351 if !is_did_web_byod {
352 if let Err(e) =
353 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref()).await
354 {
355 return (
356 StatusCode::BAD_REQUEST,
357 Json(json!({"error": "InvalidDid", "message": e})),
358 )
359 .into_response();
360 }
361 }
362 info!(did = %d, "Creating external did:web account");
363 d.clone()
364 }
365 _ => {
366 if let Some(d) = &input.did {
367 if d.starts_with("did:plc:") && is_migration {
368 info!(did = %d, "Migration with existing did:plc");
369 d.clone()
370 } else if d.starts_with("did:web:") {
371 if !is_did_web_byod {
372 if let Err(e) =
373 verify_did_web(d, &hostname, &input.handle, input.signing_key.as_deref())
374 .await
375 {
376 return (
377 StatusCode::BAD_REQUEST,
378 Json(json!({"error": "InvalidDid", "message": e})),
379 )
380 .into_response();
381 }
382 }
383 d.clone()
384 } else if !d.trim().is_empty() {
385 return (
386 StatusCode::BAD_REQUEST,
387 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
388 )
389 .into_response();
390 } else {
391 let rotation_key = std::env::var("PLC_ROTATION_KEY")
392 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
393 let genesis_result = match create_genesis_operation(
394 &signing_key,
395 &rotation_key,
396 &handle,
397 &pds_endpoint,
398 ) {
399 Ok(r) => r,
400 Err(e) => {
401 error!("Error creating PLC genesis operation: {:?}", e);
402 return (
403 StatusCode::INTERNAL_SERVER_ERROR,
404 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
405 )
406 .into_response();
407 }
408 };
409 let plc_client = PlcClient::new(None);
410 if let Err(e) = plc_client
411 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
412 .await
413 {
414 error!("Failed to submit PLC genesis operation: {:?}", e);
415 return (
416 StatusCode::BAD_GATEWAY,
417 Json(json!({
418 "error": "UpstreamError",
419 "message": format!("Failed to register DID with PLC directory: {}", e)
420 })),
421 )
422 .into_response();
423 }
424 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
425 genesis_result.did
426 }
427 } else {
428 let rotation_key = std::env::var("PLC_ROTATION_KEY")
429 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
430 let genesis_result = match create_genesis_operation(
431 &signing_key,
432 &rotation_key,
433 &handle,
434 &pds_endpoint,
435 ) {
436 Ok(r) => r,
437 Err(e) => {
438 error!("Error creating PLC genesis operation: {:?}", e);
439 return (
440 StatusCode::INTERNAL_SERVER_ERROR,
441 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
442 )
443 .into_response();
444 }
445 };
446 let plc_client = PlcClient::new(None);
447 if let Err(e) = plc_client
448 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
449 .await
450 {
451 error!("Failed to submit PLC genesis operation: {:?}", e);
452 return (
453 StatusCode::BAD_GATEWAY,
454 Json(json!({
455 "error": "UpstreamError",
456 "message": format!("Failed to register DID with PLC directory: {}", e)
457 })),
458 )
459 .into_response();
460 }
461 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
462 genesis_result.did
463 }
464 }
465 };
466 let mut tx = match state.db.begin().await {
467 Ok(tx) => tx,
468 Err(e) => {
469 error!("Error starting transaction: {:?}", e);
470 return (
471 StatusCode::INTERNAL_SERVER_ERROR,
472 Json(json!({"error": "InternalError"})),
473 )
474 .into_response();
475 }
476 };
477 if is_migration {
478 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
479 sqlx::query_as("SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE")
480 .bind(&did)
481 .fetch_optional(&mut *tx)
482 .await
483 .unwrap_or(None);
484 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
485 if deactivated_at.is_some() {
486 info!(did = %did, old_handle = %old_handle, new_handle = %handle, "Preparing existing account for inbound migration");
487 let update_result: Result<_, sqlx::Error> =
488 sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
489 .bind(&handle)
490 .bind(account_id)
491 .execute(&mut *tx)
492 .await;
493 if let Err(e) = update_result {
494 if let Some(db_err) = e.as_database_error()
495 && db_err
496 .constraint()
497 .map(|c| c.contains("handle"))
498 .unwrap_or(false)
499 {
500 return (
501 StatusCode::BAD_REQUEST,
502 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
503 )
504 .into_response();
505 }
506 error!("Error reactivating account: {:?}", e);
507 return (
508 StatusCode::INTERNAL_SERVER_ERROR,
509 Json(json!({"error": "InternalError"})),
510 )
511 .into_response();
512 }
513 if let Err(e) = tx.commit().await {
514 error!("Error committing reactivation: {:?}", e);
515 return (
516 StatusCode::INTERNAL_SERVER_ERROR,
517 Json(json!({"error": "InternalError"})),
518 )
519 .into_response();
520 }
521 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
522 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
523 )
524 .bind(account_id)
525 .fetch_optional(&state.db)
526 .await
527 .unwrap_or(None);
528 let secret_key_bytes = match key_row {
529 Some((key_bytes, encryption_version)) => {
530 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
531 Ok(k) => k,
532 Err(e) => {
533 error!("Error decrypting key for reactivated account: {:?}", e);
534 return (
535 StatusCode::INTERNAL_SERVER_ERROR,
536 Json(json!({"error": "InternalError"})),
537 )
538 .into_response();
539 }
540 }
541 }
542 None => {
543 error!("No signing key found for reactivated account");
544 return (
545 StatusCode::INTERNAL_SERVER_ERROR,
546 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
547 )
548 .into_response();
549 }
550 };
551 let access_meta =
552 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
553 Ok(m) => m,
554 Err(e) => {
555 error!("Error creating access token: {:?}", e);
556 return (
557 StatusCode::INTERNAL_SERVER_ERROR,
558 Json(json!({"error": "InternalError"})),
559 )
560 .into_response();
561 }
562 };
563 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
564 &did,
565 &secret_key_bytes,
566 ) {
567 Ok(m) => m,
568 Err(e) => {
569 error!("Error creating refresh token: {:?}", e);
570 return (
571 StatusCode::INTERNAL_SERVER_ERROR,
572 Json(json!({"error": "InternalError"})),
573 )
574 .into_response();
575 }
576 };
577 let session_result: Result<_, sqlx::Error> = sqlx::query(
578 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
579 )
580 .bind(&did)
581 .bind(&access_meta.jti)
582 .bind(&refresh_meta.jti)
583 .bind(access_meta.expires_at)
584 .bind(refresh_meta.expires_at)
585 .execute(&state.db)
586 .await;
587 if let Err(e) = session_result {
588 error!("Error creating session: {:?}", e);
589 return (
590 StatusCode::INTERNAL_SERVER_ERROR,
591 Json(json!({"error": "InternalError"})),
592 )
593 .into_response();
594 }
595 return (
596 StatusCode::OK,
597 Json(CreateAccountOutput {
598 handle: handle.clone(),
599 did,
600 access_jwt: Some(access_meta.token),
601 refresh_jwt: Some(refresh_meta.token),
602 verification_required: false,
603 verification_channel: "email".to_string(),
604 }),
605 )
606 .into_response();
607 } else {
608 return (
609 StatusCode::BAD_REQUEST,
610 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
611 )
612 .into_response();
613 }
614 }
615 }
616 let exists_result: Option<(i32,)> =
617 sqlx::query_as("SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL")
618 .bind(&handle)
619 .fetch_optional(&mut *tx)
620 .await
621 .unwrap_or(None);
622 if exists_result.is_some() {
623 return (
624 StatusCode::BAD_REQUEST,
625 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
626 )
627 .into_response();
628 }
629 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
630 .map(|v| v == "true" || v == "1")
631 .unwrap_or(false);
632 if invite_code_required
633 && input
634 .invite_code
635 .as_ref()
636 .map(|c| c.trim().is_empty())
637 .unwrap_or(true)
638 {
639 return (
640 StatusCode::BAD_REQUEST,
641 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
642 )
643 .into_response();
644 }
645 if let Some(code) = &input.invite_code
646 && !code.trim().is_empty()
647 {
648 let invite_query = sqlx::query!(
649 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
650 code
651 )
652 .fetch_optional(&mut *tx)
653 .await;
654 match invite_query {
655 Ok(Some(row)) => {
656 if row.available_uses <= 0 {
657 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
658 }
659 let update_invite = sqlx::query!(
660 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
661 code
662 )
663 .execute(&mut *tx)
664 .await;
665 if let Err(e) = update_invite {
666 error!("Error updating invite code: {:?}", e);
667 return (
668 StatusCode::INTERNAL_SERVER_ERROR,
669 Json(json!({"error": "InternalError"})),
670 )
671 .into_response();
672 }
673 }
674 Ok(None) => {
675 return (
676 StatusCode::BAD_REQUEST,
677 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
678 )
679 .into_response();
680 }
681 Err(e) => {
682 error!("Error checking invite code: {:?}", e);
683 return (
684 StatusCode::INTERNAL_SERVER_ERROR,
685 Json(json!({"error": "InternalError"})),
686 )
687 .into_response();
688 }
689 }
690 }
691 if let Err(e) = validate_password(&input.password) {
692 return (
693 StatusCode::BAD_REQUEST,
694 Json(json!({
695 "error": "InvalidPassword",
696 "message": e.to_string()
697 })),
698 )
699 .into_response();
700 }
701
702 let password_hash = match hash(&input.password, DEFAULT_COST) {
703 Ok(h) => h,
704 Err(e) => {
705 error!("Error hashing password: {:?}", e);
706 return (
707 StatusCode::INTERNAL_SERVER_ERROR,
708 Json(json!({"error": "InternalError"})),
709 )
710 .into_response();
711 }
712 };
713 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
714 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30);
715 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
716 .fetch_one(&mut *tx)
717 .await
718 .map(|c| c.unwrap_or(0) == 0)
719 .unwrap_or(false);
720 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration {
721 Some(chrono::Utc::now())
722 } else {
723 None
724 };
725 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
726 r#"INSERT INTO users (
727 handle, email, did, password_hash,
728 preferred_comms_channel,
729 discord_id, telegram_username, signal_number,
730 is_admin, deactivated_at, email_verified
731 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
732 )
733 .bind(&handle)
734 .bind(&email)
735 .bind(&did)
736 .bind(&password_hash)
737 .bind(verification_channel)
738 .bind(
739 input
740 .discord_id
741 .as_deref()
742 .map(|s| s.trim())
743 .filter(|s| !s.is_empty()),
744 )
745 .bind(
746 input
747 .telegram_username
748 .as_deref()
749 .map(|s| s.trim())
750 .filter(|s| !s.is_empty()),
751 )
752 .bind(
753 input
754 .signal_number
755 .as_deref()
756 .map(|s| s.trim())
757 .filter(|s| !s.is_empty()),
758 )
759 .bind(is_first_user)
760 .bind(deactivated_at)
761 .bind(is_migration)
762 .fetch_one(&mut *tx)
763 .await;
764 let user_id = match user_insert {
765 Ok((id,)) => id,
766 Err(e) => {
767 if let Some(db_err) = e.as_database_error()
768 && db_err.code().as_deref() == Some("23505")
769 {
770 let constraint = db_err.constraint().unwrap_or("");
771 if constraint.contains("handle") || constraint.contains("users_handle") {
772 return (
773 StatusCode::BAD_REQUEST,
774 Json(json!({
775 "error": "HandleNotAvailable",
776 "message": "Handle already taken"
777 })),
778 )
779 .into_response();
780 } else if constraint.contains("email") || constraint.contains("users_email") {
781 return (
782 StatusCode::BAD_REQUEST,
783 Json(json!({
784 "error": "InvalidEmail",
785 "message": "Email already registered"
786 })),
787 )
788 .into_response();
789 } else if constraint.contains("did") || constraint.contains("users_did") {
790 return (
791 StatusCode::BAD_REQUEST,
792 Json(json!({
793 "error": "AccountAlreadyExists",
794 "message": "An account with this DID already exists"
795 })),
796 )
797 .into_response();
798 }
799 }
800 error!("Error inserting user: {:?}", e);
801 return (
802 StatusCode::INTERNAL_SERVER_ERROR,
803 Json(json!({"error": "InternalError"})),
804 )
805 .into_response();
806 }
807 };
808
809 if !is_migration
810 && let Some(ref recipient) = verification_recipient
811 && let Err(e) = sqlx::query!(
812 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, $2::comms_channel, $3, $4, $5)",
813 user_id,
814 verification_channel as _,
815 verification_code,
816 recipient,
817 code_expires_at
818 )
819 .execute(&mut *tx)
820 .await {
821 error!("Error inserting verification code: {:?}", e);
822 return (
823 StatusCode::INTERNAL_SERVER_ERROR,
824 Json(json!({"error": "InternalError"})),
825 )
826 .into_response();
827 }
828 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
829 Ok(enc) => enc,
830 Err(e) => {
831 error!("Error encrypting user key: {:?}", e);
832 return (
833 StatusCode::INTERNAL_SERVER_ERROR,
834 Json(json!({"error": "InternalError"})),
835 )
836 .into_response();
837 }
838 };
839 let key_insert = sqlx::query!(
840 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
841 user_id,
842 &encrypted_key_bytes[..],
843 crate::config::ENCRYPTION_VERSION
844 )
845 .execute(&mut *tx)
846 .await;
847 if let Err(e) = key_insert {
848 error!("Error inserting user key: {:?}", e);
849 return (
850 StatusCode::INTERNAL_SERVER_ERROR,
851 Json(json!({"error": "InternalError"})),
852 )
853 .into_response();
854 }
855 if let Some(key_id) = reserved_key_id {
856 let mark_used = sqlx::query!(
857 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
858 key_id
859 )
860 .execute(&mut *tx)
861 .await;
862 if let Err(e) = mark_used {
863 error!("Error marking reserved key as used: {:?}", e);
864 return (
865 StatusCode::INTERNAL_SERVER_ERROR,
866 Json(json!({"error": "InternalError"})),
867 )
868 .into_response();
869 }
870 }
871 let mst = Mst::new(Arc::new(state.block_store.clone()));
872 let mst_root = match mst.persist().await {
873 Ok(c) => c,
874 Err(e) => {
875 error!("Error persisting MST: {:?}", e);
876 return (
877 StatusCode::INTERNAL_SERVER_ERROR,
878 Json(json!({"error": "InternalError"})),
879 )
880 .into_response();
881 }
882 };
883 let rev = Tid::now(LimitedU32::MIN);
884 let (commit_bytes, _sig) = match create_signed_commit(&did, mst_root, &rev.to_string(), None, &signing_key) {
885 Ok(result) => result,
886 Err(e) => {
887 error!("Error creating genesis commit: {:?}", e);
888 return (
889 StatusCode::INTERNAL_SERVER_ERROR,
890 Json(json!({"error": "InternalError"})),
891 )
892 .into_response();
893 }
894 };
895 let commit_cid = match state.block_store.put(&commit_bytes).await {
896 Ok(c) => c,
897 Err(e) => {
898 error!("Error saving genesis commit: {:?}", e);
899 return (
900 StatusCode::INTERNAL_SERVER_ERROR,
901 Json(json!({"error": "InternalError"})),
902 )
903 .into_response();
904 }
905 };
906 let commit_cid_str = commit_cid.to_string();
907 let repo_insert = sqlx::query!(
908 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
909 user_id,
910 commit_cid_str
911 )
912 .execute(&mut *tx)
913 .await;
914 if let Err(e) = repo_insert {
915 error!("Error initializing repo: {:?}", e);
916 return (
917 StatusCode::INTERNAL_SERVER_ERROR,
918 Json(json!({"error": "InternalError"})),
919 )
920 .into_response();
921 }
922 if let Some(code) = &input.invite_code
923 && !code.trim().is_empty()
924 {
925 let use_insert = sqlx::query!(
926 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
927 code,
928 user_id
929 )
930 .execute(&mut *tx)
931 .await;
932 if let Err(e) = use_insert {
933 error!("Error recording invite usage: {:?}", e);
934 return (
935 StatusCode::INTERNAL_SERVER_ERROR,
936 Json(json!({"error": "InternalError"})),
937 )
938 .into_response();
939 }
940 }
941 if let Err(e) = tx.commit().await {
942 error!("Error committing transaction: {:?}", e);
943 return (
944 StatusCode::INTERNAL_SERVER_ERROR,
945 Json(json!({"error": "InternalError"})),
946 )
947 .into_response();
948 }
949 if !is_migration {
950 if let Err(e) =
951 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
952 {
953 warn!("Failed to sequence identity event for {}: {}", did, e);
954 }
955 if let Err(e) =
956 crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
957 {
958 warn!("Failed to sequence account event for {}: {}", did, e);
959 }
960 let profile_record = json!({
961 "$type": "app.bsky.actor.profile",
962 "displayName": input.handle
963 });
964 if let Err(e) = crate::api::repo::record::create_record_internal(
965 &state,
966 &did,
967 "app.bsky.actor.profile",
968 "self",
969 &profile_record,
970 )
971 .await
972 {
973 warn!("Failed to create default profile for {}: {}", did, e);
974 }
975 if let Some(ref recipient) = verification_recipient
976 && let Err(e) = crate::comms::enqueue_signup_verification(
977 &state.db,
978 user_id,
979 verification_channel,
980 recipient,
981 &verification_code,
982 None,
983 )
984 .await
985 {
986 warn!(
987 "Failed to enqueue signup verification notification: {:?}",
988 e
989 );
990 }
991 }
992
993 let (access_jwt, refresh_jwt) = if is_migration {
994 let access_meta =
995 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
996 Ok(m) => m,
997 Err(e) => {
998 error!("Error creating access token for migration: {:?}", e);
999 return (
1000 StatusCode::INTERNAL_SERVER_ERROR,
1001 Json(json!({"error": "InternalError"})),
1002 )
1003 .into_response();
1004 }
1005 };
1006 let refresh_meta =
1007 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
1008 Ok(m) => m,
1009 Err(e) => {
1010 error!("Error creating refresh token for migration: {:?}", e);
1011 return (
1012 StatusCode::INTERNAL_SERVER_ERROR,
1013 Json(json!({"error": "InternalError"})),
1014 )
1015 .into_response();
1016 }
1017 };
1018 if let Err(e) = sqlx::query!(
1019 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
1020 did,
1021 access_meta.jti,
1022 refresh_meta.jti,
1023 access_meta.expires_at,
1024 refresh_meta.expires_at
1025 )
1026 .execute(&state.db)
1027 .await
1028 {
1029 error!("Error creating session for migration: {:?}", e);
1030 return (
1031 StatusCode::INTERNAL_SERVER_ERROR,
1032 Json(json!({"error": "InternalError"})),
1033 )
1034 .into_response();
1035 }
1036 (Some(access_meta.token), Some(refresh_meta.token))
1037 } else {
1038 (None, None)
1039 };
1040
1041 (
1042 StatusCode::OK,
1043 Json(CreateAccountOutput {
1044 handle: handle.clone(),
1045 did,
1046 access_jwt,
1047 refresh_jwt,
1048 verification_required: !is_migration,
1049 verification_channel: verification_channel.to_string(),
1050 }),
1051 )
1052 .into_response()
1053}