this repo has no description
1use super::did::verify_did_web;
2use crate::auth::{ServiceTokenVerifier, extract_bearer_token_from_header, is_service_token};
3use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
4use crate::state::{AppState, RateLimitKind};
5use axum::{
6 Json,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use bcrypt::{DEFAULT_COST, hash};
12use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use k256::{SecretKey, ecdsa::SigningKey};
15use rand::rngs::OsRng;
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::sync::Arc;
19use tracing::{debug, error, info, warn};
20
21fn extract_client_ip(headers: &HeaderMap) -> String {
22 if let Some(forwarded) = headers.get("x-forwarded-for")
23 && let Ok(value) = forwarded.to_str()
24 && let Some(first_ip) = value.split(',').next() {
25 return first_ip.trim().to_string();
26 }
27 if let Some(real_ip) = headers.get("x-real-ip")
28 && let Ok(value) = real_ip.to_str() {
29 return value.trim().to_string();
30 }
31 "unknown".to_string()
32}
33
34#[derive(Deserialize)]
35#[serde(rename_all = "camelCase")]
36pub struct CreateAccountInput {
37 pub handle: String,
38 pub email: Option<String>,
39 pub password: String,
40 pub invite_code: Option<String>,
41 pub did: Option<String>,
42 pub signing_key: Option<String>,
43 pub verification_channel: Option<String>,
44 pub discord_id: Option<String>,
45 pub telegram_username: Option<String>,
46 pub signal_number: Option<String>,
47}
48
49#[derive(Serialize)]
50#[serde(rename_all = "camelCase")]
51pub struct CreateAccountOutput {
52 pub handle: String,
53 pub did: String,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub access_jwt: Option<String>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub refresh_jwt: Option<String>,
58 pub verification_required: bool,
59 pub verification_channel: String,
60}
61
62pub async fn create_account(
63 State(state): State<AppState>,
64 headers: HeaderMap,
65 Json(input): Json<CreateAccountInput>,
66) -> Response {
67 info!("create_account called");
68 let client_ip = extract_client_ip(&headers);
69 if !state
70 .check_rate_limit(RateLimitKind::AccountCreation, &client_ip)
71 .await
72 {
73 warn!(ip = %client_ip, "Account creation rate limit exceeded");
74 return (
75 StatusCode::TOO_MANY_REQUESTS,
76 Json(json!({
77 "error": "RateLimitExceeded",
78 "message": "Too many account creation attempts. Please try again later."
79 })),
80 )
81 .into_response();
82 }
83
84 let migration_auth = if let Some(token) =
85 extract_bearer_token_from_header(headers.get("Authorization").and_then(|h| h.to_str().ok()))
86 {
87 if is_service_token(&token) {
88 let verifier = ServiceTokenVerifier::new();
89 match verifier
90 .verify_service_token(&token, Some("com.atproto.server.createAccount"))
91 .await
92 {
93 Ok(claims) => {
94 debug!("Service token verified for migration: iss={}", claims.iss);
95 Some(claims.iss)
96 }
97 Err(e) => {
98 error!("Service token verification failed: {:?}", e);
99 return (
100 StatusCode::UNAUTHORIZED,
101 Json(json!({
102 "error": "AuthenticationFailed",
103 "message": format!("Service token verification failed: {}", e)
104 })),
105 )
106 .into_response();
107 }
108 }
109 } else {
110 None
111 }
112 } else {
113 None
114 };
115
116 let is_migration = migration_auth.is_some()
117 && input.did.as_ref().map(|d| d.starts_with("did:plc:")).unwrap_or(false);
118
119 if is_migration {
120 let migration_did = input.did.as_ref().unwrap();
121 let auth_did = migration_auth.as_ref().unwrap();
122 if migration_did != auth_did {
123 return (
124 StatusCode::FORBIDDEN,
125 Json(json!({
126 "error": "AuthorizationError",
127 "message": format!("Service token issuer {} does not match DID {}", auth_did, migration_did)
128 })),
129 )
130 .into_response();
131 }
132 info!(did = %migration_did, "Processing account migration");
133 }
134
135 if input.handle.contains('!') || input.handle.contains('@') {
136 return (
137 StatusCode::BAD_REQUEST,
138 Json(
139 json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}),
140 ),
141 )
142 .into_response();
143 }
144 let email: Option<String> = input
145 .email
146 .as_ref()
147 .map(|e| e.trim().to_string())
148 .filter(|e| !e.is_empty());
149 if let Some(ref email) = email
150 && !crate::api::validation::is_valid_email(email) {
151 return (
152 StatusCode::BAD_REQUEST,
153 Json(json!({"error": "InvalidEmail", "message": "Invalid email format"})),
154 )
155 .into_response();
156 }
157 let verification_channel = input.verification_channel.as_deref().unwrap_or("email");
158 let valid_channels = ["email", "discord", "telegram", "signal"];
159 if !valid_channels.contains(&verification_channel) && !is_migration {
160 return (
161 StatusCode::BAD_REQUEST,
162 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel. Must be one of: email, discord, telegram, signal"})),
163 )
164 .into_response();
165 }
166 let verification_recipient = if is_migration {
167 None
168 } else {
169 Some(match verification_channel {
170 "email" => match &input.email {
171 Some(email) if !email.trim().is_empty() => email.trim().to_string(),
172 _ => return (
173 StatusCode::BAD_REQUEST,
174 Json(json!({"error": "MissingEmail", "message": "Email is required when using email verification"})),
175 ).into_response(),
176 },
177 "discord" => match &input.discord_id {
178 Some(id) if !id.trim().is_empty() => id.trim().to_string(),
179 _ => return (
180 StatusCode::BAD_REQUEST,
181 Json(json!({"error": "MissingDiscordId", "message": "Discord ID is required when using Discord verification"})),
182 ).into_response(),
183 },
184 "telegram" => match &input.telegram_username {
185 Some(username) if !username.trim().is_empty() => username.trim().to_string(),
186 _ => return (
187 StatusCode::BAD_REQUEST,
188 Json(json!({"error": "MissingTelegramUsername", "message": "Telegram username is required when using Telegram verification"})),
189 ).into_response(),
190 },
191 "signal" => match &input.signal_number {
192 Some(number) if !number.trim().is_empty() => number.trim().to_string(),
193 _ => return (
194 StatusCode::BAD_REQUEST,
195 Json(json!({"error": "MissingSignalNumber", "message": "Signal phone number is required when using Signal verification"})),
196 ).into_response(),
197 },
198 _ => return (
199 StatusCode::BAD_REQUEST,
200 Json(json!({"error": "InvalidVerificationChannel", "message": "Invalid verification channel"})),
201 ).into_response(),
202 })
203 };
204 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
205 let pds_endpoint = format!("https://{}", hostname);
206 let suffix = format!(".{}", hostname);
207 let short_handle = if input.handle.ends_with(&suffix) {
208 input.handle.strip_suffix(&suffix).unwrap_or(&input.handle)
209 } else {
210 &input.handle
211 };
212 let full_handle = format!("{}.{}", short_handle, hostname);
213 let (secret_key_bytes, reserved_key_id): (Vec<u8>, Option<uuid::Uuid>) =
214 if let Some(signing_key_did) = &input.signing_key {
215 let reserved = sqlx::query!(
216 r#"
217 SELECT id, private_key_bytes
218 FROM reserved_signing_keys
219 WHERE public_key_did_key = $1
220 AND used_at IS NULL
221 AND expires_at > NOW()
222 FOR UPDATE
223 "#,
224 signing_key_did
225 )
226 .fetch_optional(&state.db)
227 .await;
228 match reserved {
229 Ok(Some(row)) => (row.private_key_bytes, Some(row.id)),
230 Ok(None) => {
231 return (
232 StatusCode::BAD_REQUEST,
233 Json(json!({
234 "error": "InvalidSigningKey",
235 "message": "Signing key not found, already used, or expired"
236 })),
237 )
238 .into_response();
239 }
240 Err(e) => {
241 error!("Error looking up reserved signing key: {:?}", e);
242 return (
243 StatusCode::INTERNAL_SERVER_ERROR,
244 Json(json!({"error": "InternalError"})),
245 )
246 .into_response();
247 }
248 }
249 } else {
250 let secret_key = SecretKey::random(&mut OsRng);
251 (secret_key.to_bytes().to_vec(), None)
252 };
253 let signing_key = match SigningKey::from_slice(&secret_key_bytes) {
254 Ok(k) => k,
255 Err(e) => {
256 error!("Error creating signing key: {:?}", e);
257 return (
258 StatusCode::INTERNAL_SERVER_ERROR,
259 Json(json!({"error": "InternalError"})),
260 )
261 .into_response();
262 }
263 };
264 let did = if let Some(d) = &input.did {
265 if d.trim().is_empty() {
266 let rotation_key = std::env::var("PLC_ROTATION_KEY")
267 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
268 let genesis_result = match create_genesis_operation(
269 &signing_key,
270 &rotation_key,
271 &full_handle,
272 &pds_endpoint,
273 ) {
274 Ok(r) => r,
275 Err(e) => {
276 error!("Error creating PLC genesis operation: {:?}", e);
277 return (
278 StatusCode::INTERNAL_SERVER_ERROR,
279 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
280 )
281 .into_response();
282 }
283 };
284 let plc_client = PlcClient::new(None);
285 if let Err(e) = plc_client
286 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
287 .await
288 {
289 error!("Failed to submit PLC genesis operation: {:?}", e);
290 return (
291 StatusCode::BAD_GATEWAY,
292 Json(json!({
293 "error": "UpstreamError",
294 "message": format!("Failed to register DID with PLC directory: {}", e)
295 })),
296 )
297 .into_response();
298 }
299 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
300 genesis_result.did
301 } else if d.starts_with("did:web:") {
302 if let Err(e) = verify_did_web(d, &hostname, &input.handle).await {
303 return (
304 StatusCode::BAD_REQUEST,
305 Json(json!({"error": "InvalidDid", "message": e})),
306 )
307 .into_response();
308 }
309 d.clone()
310 } else if d.starts_with("did:plc:") && is_migration {
311 d.clone()
312 } else {
313 return (
314 StatusCode::BAD_REQUEST,
315 Json(json!({"error": "InvalidDid", "message": "Only did:web DIDs can be provided; leave empty for did:plc. For migration with existing did:plc, provide service auth."})),
316 )
317 .into_response();
318 }
319 } else {
320 let rotation_key = std::env::var("PLC_ROTATION_KEY")
321 .unwrap_or_else(|_| signing_key_to_did_key(&signing_key));
322 let genesis_result = match create_genesis_operation(
323 &signing_key,
324 &rotation_key,
325 &full_handle,
326 &pds_endpoint,
327 ) {
328 Ok(r) => r,
329 Err(e) => {
330 error!("Error creating PLC genesis operation: {:?}", e);
331 return (
332 StatusCode::INTERNAL_SERVER_ERROR,
333 Json(json!({"error": "InternalError", "message": "Failed to create PLC operation"})),
334 )
335 .into_response();
336 }
337 };
338 let plc_client = PlcClient::new(None);
339 if let Err(e) = plc_client
340 .send_operation(&genesis_result.did, &genesis_result.signed_operation)
341 .await
342 {
343 error!("Failed to submit PLC genesis operation: {:?}", e);
344 return (
345 StatusCode::BAD_GATEWAY,
346 Json(json!({
347 "error": "UpstreamError",
348 "message": format!("Failed to register DID with PLC directory: {}", e)
349 })),
350 )
351 .into_response();
352 }
353 info!(did = %genesis_result.did, "Successfully registered DID with PLC directory");
354 genesis_result.did
355 };
356 let mut tx = match state.db.begin().await {
357 Ok(tx) => tx,
358 Err(e) => {
359 error!("Error starting transaction: {:?}", e);
360 return (
361 StatusCode::INTERNAL_SERVER_ERROR,
362 Json(json!({"error": "InternalError"})),
363 )
364 .into_response();
365 }
366 };
367 if is_migration {
368 let existing_account: Option<(uuid::Uuid, String, Option<chrono::DateTime<chrono::Utc>>)> =
369 sqlx::query_as(
370 "SELECT id, handle, deactivated_at FROM users WHERE did = $1 FOR UPDATE",
371 )
372 .bind(&did)
373 .fetch_optional(&mut *tx)
374 .await
375 .unwrap_or(None);
376 if let Some((account_id, old_handle, deactivated_at)) = existing_account {
377 if deactivated_at.is_some() {
378 info!(did = %did, old_handle = %old_handle, new_handle = %short_handle, "Preparing existing account for inbound migration");
379 let update_result: Result<_, sqlx::Error> = sqlx::query(
380 "UPDATE users SET handle = $1 WHERE id = $2",
381 )
382 .bind(short_handle)
383 .bind(account_id)
384 .execute(&mut *tx)
385 .await;
386 if let Err(e) = update_result {
387 if let Some(db_err) = e.as_database_error() {
388 if db_err.constraint().map(|c| c.contains("handle")).unwrap_or(false) {
389 return (
390 StatusCode::BAD_REQUEST,
391 Json(json!({"error": "HandleTaken", "message": "Handle already taken by another account"})),
392 )
393 .into_response();
394 }
395 }
396 error!("Error reactivating account: {:?}", e);
397 return (
398 StatusCode::INTERNAL_SERVER_ERROR,
399 Json(json!({"error": "InternalError"})),
400 )
401 .into_response();
402 }
403 if let Err(e) = tx.commit().await {
404 error!("Error committing reactivation: {:?}", e);
405 return (
406 StatusCode::INTERNAL_SERVER_ERROR,
407 Json(json!({"error": "InternalError"})),
408 )
409 .into_response();
410 }
411 let key_row: Option<(Vec<u8>, i32)> = sqlx::query_as(
412 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
413 )
414 .bind(account_id)
415 .fetch_optional(&state.db)
416 .await
417 .unwrap_or(None);
418 let secret_key_bytes = match key_row {
419 Some((key_bytes, encryption_version)) => {
420 match crate::config::decrypt_key(&key_bytes, Some(encryption_version)) {
421 Ok(k) => k,
422 Err(e) => {
423 error!("Error decrypting key for reactivated account: {:?}", e);
424 return (
425 StatusCode::INTERNAL_SERVER_ERROR,
426 Json(json!({"error": "InternalError"})),
427 )
428 .into_response();
429 }
430 }
431 }
432 None => {
433 error!("No signing key found for reactivated account");
434 return (
435 StatusCode::INTERNAL_SERVER_ERROR,
436 Json(json!({"error": "InternalError", "message": "Account signing key not found"})),
437 )
438 .into_response();
439 }
440 };
441 let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
442 Ok(m) => m,
443 Err(e) => {
444 error!("Error creating access token: {:?}", e);
445 return (
446 StatusCode::INTERNAL_SERVER_ERROR,
447 Json(json!({"error": "InternalError"})),
448 )
449 .into_response();
450 }
451 };
452 let refresh_meta = match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
453 Ok(m) => m,
454 Err(e) => {
455 error!("Error creating refresh token: {:?}", e);
456 return (
457 StatusCode::INTERNAL_SERVER_ERROR,
458 Json(json!({"error": "InternalError"})),
459 )
460 .into_response();
461 }
462 };
463 let session_result: Result<_, sqlx::Error> = sqlx::query(
464 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
465 )
466 .bind(&did)
467 .bind(&access_meta.jti)
468 .bind(&refresh_meta.jti)
469 .bind(access_meta.expires_at)
470 .bind(refresh_meta.expires_at)
471 .execute(&state.db)
472 .await;
473 if let Err(e) = session_result {
474 error!("Error creating session: {:?}", e);
475 return (
476 StatusCode::INTERNAL_SERVER_ERROR,
477 Json(json!({"error": "InternalError"})),
478 )
479 .into_response();
480 }
481 return (
482 StatusCode::OK,
483 Json(CreateAccountOutput {
484 handle: full_handle.clone(),
485 did,
486 access_jwt: Some(access_meta.token),
487 refresh_jwt: Some(refresh_meta.token),
488 verification_required: false,
489 verification_channel: "email".to_string(),
490 }),
491 )
492 .into_response();
493 } else {
494 return (
495 StatusCode::BAD_REQUEST,
496 Json(json!({"error": "AccountAlreadyExists", "message": "An active account with this DID already exists"})),
497 )
498 .into_response();
499 }
500 }
501 }
502 let exists_result: Option<(i32,)> = sqlx::query_as(
503 "SELECT 1 FROM users WHERE handle = $1 AND deactivated_at IS NULL",
504 )
505 .bind(short_handle)
506 .fetch_optional(&mut *tx)
507 .await
508 .unwrap_or(None);
509 if exists_result.is_some() {
510 return (
511 StatusCode::BAD_REQUEST,
512 Json(json!({"error": "HandleTaken", "message": "Handle already taken"})),
513 )
514 .into_response();
515 }
516 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
517 .map(|v| v == "true" || v == "1")
518 .unwrap_or(false);
519 if invite_code_required && input.invite_code.as_ref().map(|c| c.trim().is_empty()).unwrap_or(true) {
520 return (
521 StatusCode::BAD_REQUEST,
522 Json(json!({"error": "InvalidInviteCode", "message": "Invite code is required"})),
523 )
524 .into_response();
525 }
526 if let Some(code) = &input.invite_code {
527 if !code.trim().is_empty() {
528 let invite_query = sqlx::query!(
529 "SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE",
530 code
531 )
532 .fetch_optional(&mut *tx)
533 .await;
534 match invite_query {
535 Ok(Some(row)) => {
536 if row.available_uses <= 0 {
537 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
538 }
539 let update_invite = sqlx::query!(
540 "UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1",
541 code
542 )
543 .execute(&mut *tx)
544 .await;
545 if let Err(e) = update_invite {
546 error!("Error updating invite code: {:?}", e);
547 return (
548 StatusCode::INTERNAL_SERVER_ERROR,
549 Json(json!({"error": "InternalError"})),
550 )
551 .into_response();
552 }
553 }
554 Ok(None) => {
555 return (
556 StatusCode::BAD_REQUEST,
557 Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"})),
558 )
559 .into_response();
560 }
561 Err(e) => {
562 error!("Error checking invite code: {:?}", e);
563 return (
564 StatusCode::INTERNAL_SERVER_ERROR,
565 Json(json!({"error": "InternalError"})),
566 )
567 .into_response();
568 }
569 }
570 }
571 }
572 let password_hash = match hash(&input.password, DEFAULT_COST) {
573 Ok(h) => h,
574 Err(e) => {
575 error!("Error hashing password: {:?}", e);
576 return (
577 StatusCode::INTERNAL_SERVER_ERROR,
578 Json(json!({"error": "InternalError"})),
579 )
580 .into_response();
581 }
582 };
583 let verification_code = format!("{:06}", rand::random::<u32>() % 1_000_000);
584 let code_expires_at = chrono::Utc::now() + chrono::Duration::minutes(30);
585 let is_first_user = sqlx::query_scalar!("SELECT COUNT(*) as count FROM users")
586 .fetch_one(&mut *tx)
587 .await
588 .map(|c| c.unwrap_or(0) == 0)
589 .unwrap_or(false);
590 let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = if is_migration {
591 Some(chrono::Utc::now())
592 } else {
593 None
594 };
595 let user_insert: Result<(uuid::Uuid,), _> = sqlx::query_as(
596 r#"INSERT INTO users (
597 handle, email, did, password_hash,
598 preferred_comms_channel,
599 discord_id, telegram_username, signal_number,
600 is_admin, deactivated_at, email_verified
601 ) VALUES ($1, $2, $3, $4, $5::comms_channel, $6, $7, $8, $9, $10, $11) RETURNING id"#,
602 )
603 .bind(short_handle)
604 .bind(&email)
605 .bind(&did)
606 .bind(&password_hash)
607 .bind(verification_channel)
608 .bind(
609 input
610 .discord_id
611 .as_deref()
612 .map(|s| s.trim())
613 .filter(|s| !s.is_empty()),
614 )
615 .bind(
616 input
617 .telegram_username
618 .as_deref()
619 .map(|s| s.trim())
620 .filter(|s| !s.is_empty()),
621 )
622 .bind(
623 input
624 .signal_number
625 .as_deref()
626 .map(|s| s.trim())
627 .filter(|s| !s.is_empty()),
628 )
629 .bind(is_first_user)
630 .bind(deactivated_at)
631 .bind(is_migration)
632 .fetch_one(&mut *tx)
633 .await;
634 let user_id = match user_insert {
635 Ok((id,)) => id,
636 Err(e) => {
637 if let Some(db_err) = e.as_database_error()
638 && db_err.code().as_deref() == Some("23505") {
639 let constraint = db_err.constraint().unwrap_or("");
640 if constraint.contains("handle") || constraint.contains("users_handle") {
641 return (
642 StatusCode::BAD_REQUEST,
643 Json(json!({
644 "error": "HandleNotAvailable",
645 "message": "Handle already taken"
646 })),
647 )
648 .into_response();
649 } else if constraint.contains("email") || constraint.contains("users_email") {
650 return (
651 StatusCode::BAD_REQUEST,
652 Json(json!({
653 "error": "InvalidEmail",
654 "message": "Email already registered"
655 })),
656 )
657 .into_response();
658 } else if constraint.contains("did") || constraint.contains("users_did") {
659 return (
660 StatusCode::BAD_REQUEST,
661 Json(json!({
662 "error": "AccountAlreadyExists",
663 "message": "An account with this DID already exists"
664 })),
665 )
666 .into_response();
667 }
668 }
669 error!("Error inserting user: {:?}", e);
670 return (
671 StatusCode::INTERNAL_SERVER_ERROR,
672 Json(json!({"error": "InternalError"})),
673 )
674 .into_response();
675 }
676 };
677
678 if !is_migration {
679 if let Err(e) = sqlx::query!(
680 "INSERT INTO channel_verifications (user_id, channel, code, pending_identifier, expires_at) VALUES ($1, 'email', $2, $3, $4)",
681 user_id,
682 verification_code,
683 email,
684 code_expires_at
685 )
686 .execute(&mut *tx)
687 .await {
688 error!("Error inserting verification code: {:?}", e);
689 return (
690 StatusCode::INTERNAL_SERVER_ERROR,
691 Json(json!({"error": "InternalError"})),
692 )
693 .into_response();
694 }
695 }
696 let encrypted_key_bytes = match crate::config::encrypt_key(&secret_key_bytes) {
697 Ok(enc) => enc,
698 Err(e) => {
699 error!("Error encrypting user key: {:?}", e);
700 return (
701 StatusCode::INTERNAL_SERVER_ERROR,
702 Json(json!({"error": "InternalError"})),
703 )
704 .into_response();
705 }
706 };
707 let key_insert = sqlx::query!(
708 "INSERT INTO user_keys (user_id, key_bytes, encryption_version, encrypted_at) VALUES ($1, $2, $3, NOW())",
709 user_id,
710 &encrypted_key_bytes[..],
711 crate::config::ENCRYPTION_VERSION
712 )
713 .execute(&mut *tx)
714 .await;
715 if let Err(e) = key_insert {
716 error!("Error inserting user key: {:?}", e);
717 return (
718 StatusCode::INTERNAL_SERVER_ERROR,
719 Json(json!({"error": "InternalError"})),
720 )
721 .into_response();
722 }
723 if let Some(key_id) = reserved_key_id {
724 let mark_used = sqlx::query!(
725 "UPDATE reserved_signing_keys SET used_at = NOW() WHERE id = $1",
726 key_id
727 )
728 .execute(&mut *tx)
729 .await;
730 if let Err(e) = mark_used {
731 error!("Error marking reserved key as used: {:?}", e);
732 return (
733 StatusCode::INTERNAL_SERVER_ERROR,
734 Json(json!({"error": "InternalError"})),
735 )
736 .into_response();
737 }
738 }
739 let mst = Mst::new(Arc::new(state.block_store.clone()));
740 let mst_root = match mst.persist().await {
741 Ok(c) => c,
742 Err(e) => {
743 error!("Error persisting MST: {:?}", e);
744 return (
745 StatusCode::INTERNAL_SERVER_ERROR,
746 Json(json!({"error": "InternalError"})),
747 )
748 .into_response();
749 }
750 };
751 let did_obj = match Did::new(&did) {
752 Ok(d) => d,
753 Err(_) => {
754 return (
755 StatusCode::INTERNAL_SERVER_ERROR,
756 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
757 )
758 .into_response();
759 }
760 };
761 let rev = Tid::now(LimitedU32::MIN);
762 let unsigned_commit = Commit::new_unsigned(did_obj, mst_root, rev, None);
763 let signed_commit = match unsigned_commit.sign(&signing_key) {
764 Ok(c) => c,
765 Err(e) => {
766 error!("Error signing genesis commit: {:?}", e);
767 return (
768 StatusCode::INTERNAL_SERVER_ERROR,
769 Json(json!({"error": "InternalError"})),
770 )
771 .into_response();
772 }
773 };
774 let commit_bytes = match signed_commit.to_cbor() {
775 Ok(b) => b,
776 Err(e) => {
777 error!("Error serializing genesis commit: {:?}", e);
778 return (
779 StatusCode::INTERNAL_SERVER_ERROR,
780 Json(json!({"error": "InternalError"})),
781 )
782 .into_response();
783 }
784 };
785 let commit_cid = match state.block_store.put(&commit_bytes).await {
786 Ok(c) => c,
787 Err(e) => {
788 error!("Error saving genesis commit: {:?}", e);
789 return (
790 StatusCode::INTERNAL_SERVER_ERROR,
791 Json(json!({"error": "InternalError"})),
792 )
793 .into_response();
794 }
795 };
796 let commit_cid_str = commit_cid.to_string();
797 let repo_insert = sqlx::query!(
798 "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
799 user_id,
800 commit_cid_str
801 )
802 .execute(&mut *tx)
803 .await;
804 if let Err(e) = repo_insert {
805 error!("Error initializing repo: {:?}", e);
806 return (
807 StatusCode::INTERNAL_SERVER_ERROR,
808 Json(json!({"error": "InternalError"})),
809 )
810 .into_response();
811 }
812 if let Some(code) = &input.invite_code {
813 if !code.trim().is_empty() {
814 let use_insert = sqlx::query!(
815 "INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)",
816 code,
817 user_id
818 )
819 .execute(&mut *tx)
820 .await;
821 if let Err(e) = use_insert {
822 error!("Error recording invite usage: {:?}", e);
823 return (
824 StatusCode::INTERNAL_SERVER_ERROR,
825 Json(json!({"error": "InternalError"})),
826 )
827 .into_response();
828 }
829 }
830 }
831 if let Err(e) = tx.commit().await {
832 error!("Error committing transaction: {:?}", e);
833 return (
834 StatusCode::INTERNAL_SERVER_ERROR,
835 Json(json!({"error": "InternalError"})),
836 )
837 .into_response();
838 }
839 if !is_migration {
840 if let Err(e) =
841 crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle)).await
842 {
843 warn!("Failed to sequence identity event for {}: {}", did, e);
844 }
845 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
846 {
847 warn!("Failed to sequence account event for {}: {}", did, e);
848 }
849 let profile_record = json!({
850 "$type": "app.bsky.actor.profile",
851 "displayName": input.handle
852 });
853 if let Err(e) = crate::api::repo::record::create_record_internal(
854 &state,
855 &did,
856 "app.bsky.actor.profile",
857 "self",
858 &profile_record,
859 )
860 .await
861 {
862 warn!("Failed to create default profile for {}: {}", did, e);
863 }
864 if let Some(ref recipient) = verification_recipient {
865 if let Err(e) = crate::comms::enqueue_signup_verification(
866 &state.db,
867 user_id,
868 verification_channel,
869 recipient,
870 &verification_code,
871 )
872 .await
873 {
874 warn!(
875 "Failed to enqueue signup verification notification: {:?}",
876 e
877 );
878 }
879 }
880 }
881
882 let (access_jwt, refresh_jwt) = if is_migration {
883 let access_meta =
884 match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) {
885 Ok(m) => m,
886 Err(e) => {
887 error!("Error creating access token for migration: {:?}", e);
888 return (
889 StatusCode::INTERNAL_SERVER_ERROR,
890 Json(json!({"error": "InternalError"})),
891 )
892 .into_response();
893 }
894 };
895 let refresh_meta =
896 match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
897 Ok(m) => m,
898 Err(e) => {
899 error!("Error creating refresh token for migration: {:?}", e);
900 return (
901 StatusCode::INTERNAL_SERVER_ERROR,
902 Json(json!({"error": "InternalError"})),
903 )
904 .into_response();
905 }
906 };
907 if let Err(e) = sqlx::query!(
908 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
909 did,
910 access_meta.jti,
911 refresh_meta.jti,
912 access_meta.expires_at,
913 refresh_meta.expires_at
914 )
915 .execute(&state.db)
916 .await
917 {
918 error!("Error creating session for migration: {:?}", e);
919 return (
920 StatusCode::INTERNAL_SERVER_ERROR,
921 Json(json!({"error": "InternalError"})),
922 )
923 .into_response();
924 }
925 (Some(access_meta.token), Some(refresh_meta.token))
926 } else {
927 (None, None)
928 };
929
930 (
931 StatusCode::OK,
932 Json(CreateAccountOutput {
933 handle: full_handle.clone(),
934 did,
935 access_jwt,
936 refresh_jwt,
937 verification_required: !is_migration,
938 verification_channel: verification_channel.to_string(),
939 }),
940 )
941 .into_response()
942}