this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use chrono::Utc;
5use reqwest::{Client, StatusCode, header};
6use serde_json::{Value, json};
7use sqlx::postgres::PgPoolOptions;
8use std::collections::HashMap;
9use std::sync::{Arc, OnceLock, RwLock};
10#[allow(unused_imports)]
11use std::time::Duration;
12use tokio::net::TcpListener;
13use tranquil_pds::state::AppState;
14use wiremock::matchers::{method, path};
15use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};
16
17static SERVER_URL: OnceLock<String> = OnceLock::new();
18static APP_PORT: OnceLock<u16> = OnceLock::new();
19static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
20static MOCK_PLC: OnceLock<MockServer> = OnceLock::new();
21static TEST_DB_POOL: OnceLock<sqlx::PgPool> = OnceLock::new();
22
23#[cfg(not(feature = "external-infra"))]
24use testcontainers::core::ContainerPort;
25#[cfg(not(feature = "external-infra"))]
26use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
27#[cfg(not(feature = "external-infra"))]
28use testcontainers_modules::postgres::Postgres;
29#[cfg(not(feature = "external-infra"))]
30static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
31#[cfg(not(feature = "external-infra"))]
32static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
33
34#[allow(dead_code)]
35pub const AUTH_TOKEN: &str = "test-token";
36#[allow(dead_code)]
37pub const BAD_AUTH_TOKEN: &str = "bad-token";
38#[allow(dead_code)]
39pub const AUTH_DID: &str = "did:plc:fake";
40#[allow(dead_code)]
41pub const TARGET_DID: &str = "did:plc:target";
42
43fn has_external_infra() -> bool {
44 std::env::var("TRANQUIL_PDS_TEST_INFRA_READY").is_ok()
45 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok())
46}
47#[cfg(test)]
48#[ctor::dtor]
49fn cleanup() {
50 if has_external_infra() {
51 return;
52 }
53 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
54 let _ = std::process::Command::new("podman")
55 .args(["rm", "-f", "--filter", "label=tranquil_pds_test=true"])
56 .output();
57 }
58 let _ = std::process::Command::new("docker")
59 .args([
60 "container",
61 "prune",
62 "-f",
63 "--filter",
64 "label=tranquil_pds_test=true",
65 ])
66 .output();
67}
68
69#[allow(dead_code)]
70pub fn client() -> Client {
71 Client::new()
72}
73
74#[allow(dead_code)]
75pub fn app_port() -> u16 {
76 *APP_PORT.get().expect("APP_PORT not initialized")
77}
78
79pub async fn base_url() -> &'static str {
80 SERVER_URL.get_or_init(|| {
81 let (tx, rx) = std::sync::mpsc::channel();
82 std::thread::spawn(move || {
83 unsafe {
84 std::env::set_var("TRANQUIL_PDS_ALLOW_INSECURE_SECRETS", "1");
85 }
86 if std::env::var("DOCKER_HOST").is_err()
87 && let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR")
88 {
89 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
90 if podman_sock.exists() {
91 unsafe {
92 std::env::set_var(
93 "DOCKER_HOST",
94 format!("unix://{}", podman_sock.display()),
95 );
96 }
97 }
98 }
99 let rt = tokio::runtime::Runtime::new().unwrap();
100 rt.block_on(async move {
101 if has_external_infra() {
102 let url = setup_with_external_infra().await;
103 tx.send(url).unwrap();
104 } else {
105 let url = setup_with_testcontainers().await;
106 tx.send(url).unwrap();
107 }
108 std::future::pending::<()>().await;
109 });
110 });
111 rx.recv().expect("Failed to start test server")
112 })
113}
114
115async fn setup_with_external_infra() -> String {
116 let database_url =
117 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra");
118 let s3_endpoint =
119 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra");
120 let plc_url = setup_mock_plc_directory().await;
121 unsafe {
122 std::env::set_var(
123 "S3_BUCKET",
124 std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()),
125 );
126 std::env::set_var(
127 "AWS_ACCESS_KEY_ID",
128 std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()),
129 );
130 std::env::set_var(
131 "AWS_SECRET_ACCESS_KEY",
132 std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()),
133 );
134 std::env::set_var(
135 "AWS_REGION",
136 std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()),
137 );
138 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
139 std::env::set_var("MAX_IMPORT_SIZE", "100000000");
140 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
141 std::env::set_var("PLC_DIRECTORY_URL", &plc_url);
142 }
143 let mock_server = MockServer::start().await;
144 setup_mock_appview(&mock_server).await;
145 let mock_uri = mock_server.uri();
146 let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri);
147 let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A"));
148 setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await;
149 MOCK_APPVIEW.set(mock_server).ok();
150 spawn_app(database_url).await
151}
152
153#[cfg(not(feature = "external-infra"))]
154async fn setup_with_testcontainers() -> String {
155 let s3_container = GenericImage::new("minio/minio", "latest")
156 .with_exposed_port(ContainerPort::Tcp(9000))
157 .with_env_var("MINIO_ROOT_USER", "minioadmin")
158 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
159 .with_cmd(vec!["server".to_string(), "/data".to_string()])
160 .with_label("tranquil_pds_test", "true")
161 .start()
162 .await
163 .expect("Failed to start MinIO");
164 let s3_port = s3_container
165 .get_host_port_ipv4(9000)
166 .await
167 .expect("Failed to get S3 port");
168 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
169 let plc_url = setup_mock_plc_directory().await;
170 unsafe {
171 std::env::set_var("S3_BUCKET", "test-bucket");
172 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
173 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
174 std::env::set_var("AWS_REGION", "us-east-1");
175 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
176 std::env::set_var("MAX_IMPORT_SIZE", "100000000");
177 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
178 std::env::set_var("PLC_DIRECTORY_URL", &plc_url);
179 }
180 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
181 .region("us-east-1")
182 .endpoint_url(&s3_endpoint)
183 .credentials_provider(Credentials::new(
184 "minioadmin",
185 "minioadmin",
186 None,
187 None,
188 "test",
189 ))
190 .load()
191 .await;
192 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
193 .force_path_style(true)
194 .build();
195 let s3_client = S3Client::from_conf(s3_config);
196 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
197 let mock_server = MockServer::start().await;
198 setup_mock_appview(&mock_server).await;
199 let mock_uri = mock_server.uri();
200 let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri);
201 let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A"));
202 setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await;
203 MOCK_APPVIEW.set(mock_server).ok();
204 S3_CONTAINER.set(s3_container).ok();
205 let container = Postgres::default()
206 .with_tag("18-alpine")
207 .with_label("tranquil_pds_test", "true")
208 .start()
209 .await
210 .expect("Failed to start Postgres");
211 let connection_string = format!(
212 "postgres://postgres:postgres@127.0.0.1:{}",
213 container
214 .get_host_port_ipv4(5432)
215 .await
216 .expect("Failed to get port")
217 );
218 DB_CONTAINER.set(container).ok();
219 spawn_app(connection_string).await
220}
221
222#[cfg(feature = "external-infra")]
223async fn setup_with_testcontainers() -> String {
224 panic!(
225 "Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT."
226 );
227}
228
229async fn setup_mock_did_document(mock_server: &MockServer, did: &str, service_endpoint: &str) {
230 Mock::given(method("GET"))
231 .and(path("/.well-known/did.json"))
232 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
233 "id": did,
234 "service": [{
235 "id": "#atproto_appview",
236 "type": "AtprotoAppView",
237 "serviceEndpoint": service_endpoint
238 }]
239 })))
240 .mount(mock_server)
241 .await;
242}
243
244async fn setup_mock_appview(_mock_server: &MockServer) {}
245
246type PlcOperationStore = Arc<RwLock<HashMap<String, Value>>>;
247
248struct PlcPostResponder {
249 store: PlcOperationStore,
250}
251
252impl Respond for PlcPostResponder {
253 fn respond(&self, request: &Request) -> ResponseTemplate {
254 let path = request.url.path();
255 let did = urlencoding::decode(path.trim_start_matches('/'))
256 .unwrap_or_default()
257 .to_string();
258
259 if let Ok(body) = serde_json::from_slice::<Value>(request.body.as_slice())
260 && let Ok(mut store) = self.store.write()
261 {
262 store.insert(did, body);
263 }
264 ResponseTemplate::new(200)
265 }
266}
267
268struct PlcGetResponder {
269 store: PlcOperationStore,
270}
271
272impl Respond for PlcGetResponder {
273 fn respond(&self, request: &Request) -> ResponseTemplate {
274 let path = request.url.path();
275 let path_clean = path.trim_start_matches('/');
276
277 let (did, endpoint) = path_clean
278 .find("/log/")
279 .or_else(|| path_clean.find("/data"))
280 .map(|idx| {
281 let did = urlencoding::decode(&path_clean[..idx])
282 .unwrap_or_default()
283 .to_string();
284 let endpoint = &path_clean[idx..];
285 (did, endpoint)
286 })
287 .unwrap_or_else(|| {
288 (
289 urlencoding::decode(path_clean)
290 .unwrap_or_default()
291 .to_string(),
292 "",
293 )
294 });
295
296 let store = self.store.read().unwrap();
297 let operation = store.get(&did);
298
299 match endpoint {
300 "/log/last" => {
301 let response = operation.cloned().unwrap_or_else(|| {
302 json!({
303 "type": "plc_operation",
304 "rotationKeys": [],
305 "verificationMethods": {},
306 "alsoKnownAs": [],
307 "services": {},
308 "prev": null
309 })
310 });
311 ResponseTemplate::new(200).set_body_json(response)
312 }
313 "/log/audit" => ResponseTemplate::new(200).set_body_json(json!([])),
314 "/data" => {
315 let response = operation
316 .map(|op| {
317 json!({
318 "rotationKeys": op.get("rotationKeys").cloned().unwrap_or(json!([])),
319 "verificationMethods": op.get("verificationMethods").cloned().unwrap_or(json!({})),
320 "alsoKnownAs": op.get("alsoKnownAs").cloned().unwrap_or(json!([])),
321 "services": op.get("services").cloned().unwrap_or(json!({}))
322 })
323 })
324 .unwrap_or_else(|| {
325 json!({
326 "rotationKeys": [],
327 "verificationMethods": {},
328 "alsoKnownAs": [],
329 "services": {}
330 })
331 });
332 ResponseTemplate::new(200).set_body_json(response)
333 }
334 _ => {
335 let did_doc = operation
336 .map(|op| operation_to_did_document(&did, op))
337 .unwrap_or_else(|| {
338 json!({
339 "@context": ["https://www.w3.org/ns/did/v1"],
340 "id": did,
341 "alsoKnownAs": [],
342 "verificationMethod": [],
343 "service": []
344 })
345 });
346 ResponseTemplate::new(200).set_body_json(did_doc)
347 }
348 }
349 }
350}
351
352fn operation_to_did_document(did: &str, op: &Value) -> Value {
353 let also_known_as = op
354 .get("alsoKnownAs")
355 .and_then(|v| v.as_array())
356 .cloned()
357 .unwrap_or_default();
358
359 let verification_methods: Vec<Value> = op
360 .get("verificationMethods")
361 .and_then(|v| v.as_object())
362 .map(|methods| {
363 methods
364 .iter()
365 .map(|(key, value)| {
366 let did_key = value.as_str().unwrap_or("");
367 let multikey = did_key_to_multikey(did_key);
368 json!({
369 "id": format!("{}#{}", did, key),
370 "type": "Multikey",
371 "controller": did,
372 "publicKeyMultibase": multikey
373 })
374 })
375 .collect()
376 })
377 .unwrap_or_default();
378
379 let services: Vec<Value> = op
380 .get("services")
381 .and_then(|v| v.as_object())
382 .map(|svcs| {
383 svcs.iter()
384 .map(|(key, value)| {
385 json!({
386 "id": format!("#{}", key),
387 "type": value.get("type").and_then(|t| t.as_str()).unwrap_or(""),
388 "serviceEndpoint": value.get("endpoint").and_then(|e| e.as_str()).unwrap_or("")
389 })
390 })
391 .collect()
392 })
393 .unwrap_or_default();
394
395 json!({
396 "@context": [
397 "https://www.w3.org/ns/did/v1",
398 "https://w3id.org/security/multikey/v1"
399 ],
400 "id": did,
401 "alsoKnownAs": also_known_as,
402 "verificationMethod": verification_methods,
403 "service": services
404 })
405}
406
407fn did_key_to_multikey(did_key: &str) -> String {
408 if !did_key.starts_with("did:key:z") {
409 return String::new();
410 }
411 did_key[8..].to_string()
412}
413
414async fn setup_mock_plc_directory() -> String {
415 let mock_plc = MockServer::start().await;
416 let store: PlcOperationStore = Arc::new(RwLock::new(HashMap::new()));
417
418 Mock::given(method("POST"))
419 .respond_with(PlcPostResponder {
420 store: store.clone(),
421 })
422 .mount(&mock_plc)
423 .await;
424
425 Mock::given(method("GET"))
426 .respond_with(PlcGetResponder {
427 store: store.clone(),
428 })
429 .mount(&mock_plc)
430 .await;
431
432 let plc_url = mock_plc.uri();
433 MOCK_PLC.set(mock_plc).ok();
434 plc_url
435}
436
437async fn spawn_app(database_url: String) -> String {
438 use tranquil_pds::rate_limit::RateLimiters;
439 let pool = PgPoolOptions::new()
440 .max_connections(10)
441 .acquire_timeout(std::time::Duration::from_secs(30))
442 .connect(&database_url)
443 .await
444 .expect("Failed to connect to Postgres. Make sure the database is running.");
445 sqlx::migrate!("./migrations")
446 .run(&pool)
447 .await
448 .expect("Failed to run migrations");
449 let test_pool = PgPoolOptions::new()
450 .max_connections(5)
451 .acquire_timeout(std::time::Duration::from_secs(30))
452 .connect(&database_url)
453 .await
454 .expect("Failed to create test pool");
455 TEST_DB_POOL.set(test_pool).ok();
456 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
457 let addr = listener.local_addr().unwrap();
458 APP_PORT.set(addr.port()).ok();
459 unsafe {
460 std::env::set_var("PDS_HOSTNAME", addr.to_string());
461 }
462 let rate_limiters = RateLimiters::new()
463 .with_login_limit(10000)
464 .with_account_creation_limit(10000)
465 .with_password_reset_limit(10000)
466 .with_email_update_limit(10000)
467 .with_oauth_authorize_limit(10000)
468 .with_oauth_token_limit(10000);
469 let state = AppState::from_db(pool)
470 .await
471 .with_rate_limiters(rate_limiters);
472 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
473 let app = tranquil_pds::app(state);
474 tokio::spawn(async move {
475 axum::serve(listener, app).await.unwrap();
476 });
477 format!("http://{}", addr)
478}
479
480#[allow(dead_code)]
481pub async fn get_db_connection_string() -> String {
482 base_url().await;
483 if has_external_infra() {
484 std::env::var("DATABASE_URL").expect("DATABASE_URL not set")
485 } else {
486 #[cfg(not(feature = "external-infra"))]
487 {
488 let container = DB_CONTAINER.get().expect("DB container not initialized");
489 let port = container
490 .get_host_port_ipv4(5432)
491 .await
492 .expect("Failed to get port");
493 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
494 }
495 #[cfg(feature = "external-infra")]
496 {
497 panic!("DATABASE_URL must be set with external-infra feature");
498 }
499 }
500}
501
502#[allow(dead_code)]
503pub async fn get_test_db_pool() -> &'static sqlx::PgPool {
504 base_url().await;
505 TEST_DB_POOL.get().expect("TEST_DB_POOL not initialized")
506}
507
508#[allow(dead_code)]
509pub async fn verify_new_account(client: &Client, did: &str) -> String {
510 let pool = get_test_db_pool().await;
511 let body_text: String = sqlx::query_scalar!(
512 "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1",
513 did
514 )
515 .fetch_one(pool)
516 .await
517 .expect("Failed to get verification code");
518
519 let lines: Vec<&str> = body_text.lines().collect();
520 let verification_code = lines
521 .iter()
522 .enumerate()
523 .find(|(_, line)| line.contains("verification code is:") || line.contains("code is:"))
524 .and_then(|(i, _)| lines.get(i + 1).map(|s| s.trim().to_string()))
525 .or_else(|| {
526 body_text
527 .split_whitespace()
528 .find(|word| word.contains('-') && word.chars().filter(|c| *c == '-').count() >= 3)
529 .map(|s| s.to_string())
530 })
531 .unwrap_or_else(|| body_text.clone());
532
533 let confirm_payload = json!({
534 "did": did,
535 "verificationCode": verification_code
536 });
537 let confirm_res = client
538 .post(format!(
539 "{}/xrpc/com.atproto.server.confirmSignup",
540 base_url().await
541 ))
542 .json(&confirm_payload)
543 .send()
544 .await
545 .expect("confirmSignup request failed");
546 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed");
547 let confirm_body: Value = confirm_res
548 .json()
549 .await
550 .expect("Invalid JSON from confirmSignup");
551 confirm_body["accessJwt"]
552 .as_str()
553 .expect("No accessJwt in confirmSignup response")
554 .to_string()
555}
556
557#[allow(dead_code)]
558pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
559 let res = client
560 .post(format!(
561 "{}/xrpc/com.atproto.repo.uploadBlob",
562 base_url().await
563 ))
564 .header(header::CONTENT_TYPE, mime)
565 .bearer_auth(AUTH_TOKEN)
566 .body(data)
567 .send()
568 .await
569 .expect("Failed to send uploadBlob request");
570 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
571 let body: Value = res.json().await.expect("Blob upload response was not JSON");
572 body["blob"].clone()
573}
574
575#[allow(dead_code)]
576pub async fn create_test_post(
577 client: &Client,
578 text: &str,
579 reply_to: Option<Value>,
580) -> (String, String, String) {
581 let collection = "app.bsky.feed.post";
582 let mut record = json!({
583 "$type": collection,
584 "text": text,
585 "createdAt": Utc::now().to_rfc3339()
586 });
587 if let Some(reply_obj) = reply_to {
588 record["reply"] = reply_obj;
589 }
590 let payload = json!({
591 "repo": AUTH_DID,
592 "collection": collection,
593 "record": record
594 });
595 let res = client
596 .post(format!(
597 "{}/xrpc/com.atproto.repo.createRecord",
598 base_url().await
599 ))
600 .bearer_auth(AUTH_TOKEN)
601 .json(&payload)
602 .send()
603 .await
604 .expect("Failed to send createRecord");
605 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
606 let body: Value = res
607 .json()
608 .await
609 .expect("createRecord response was not JSON");
610 let uri = body["uri"]
611 .as_str()
612 .expect("Response had no URI")
613 .to_string();
614 let cid = body["cid"]
615 .as_str()
616 .expect("Response had no CID")
617 .to_string();
618 let rkey = uri
619 .split('/')
620 .next_back()
621 .expect("URI was malformed")
622 .to_string();
623 (uri, cid, rkey)
624}
625
626#[allow(dead_code)]
627pub async fn create_account_and_login(client: &Client) -> (String, String) {
628 create_account_and_login_internal(client, false).await
629}
630
631#[allow(dead_code)]
632pub async fn create_admin_account_and_login(client: &Client) -> (String, String) {
633 create_account_and_login_internal(client, true).await
634}
635
636async fn create_account_and_login_internal(client: &Client, make_admin: bool) -> (String, String) {
637 let mut last_error = String::new();
638 for attempt in 0..3 {
639 if attempt > 0 {
640 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
641 }
642 let handle = format!("u{}", &uuid::Uuid::new_v4().simple().to_string()[..12]);
643 let payload = json!({
644 "handle": handle,
645 "email": format!("{}@example.com", handle),
646 "password": "Testpass123!"
647 });
648 let res = match client
649 .post(format!(
650 "{}/xrpc/com.atproto.server.createAccount",
651 base_url().await
652 ))
653 .json(&payload)
654 .send()
655 .await
656 {
657 Ok(r) => r,
658 Err(e) => {
659 last_error = format!("Request failed: {}", e);
660 continue;
661 }
662 };
663 if res.status() == StatusCode::OK {
664 let body: Value = res.json().await.expect("Invalid JSON");
665 let did = body["did"].as_str().expect("No did").to_string();
666 let pool = get_test_db_pool().await;
667 if make_admin {
668 sqlx::query!("UPDATE users SET is_admin = TRUE WHERE did = $1", &did)
669 .execute(pool)
670 .await
671 .expect("Failed to mark user as admin");
672 }
673 let verification_required = body["verificationRequired"].as_bool().unwrap_or(true);
674 if let Some(access_jwt) = body["accessJwt"].as_str()
675 && !verification_required
676 {
677 return (access_jwt.to_string(), did);
678 }
679 let body_text: String = sqlx::query_scalar!(
680 "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1",
681 &did
682 )
683 .fetch_one(pool)
684 .await
685 .expect("Failed to get verification from comms_queue");
686 let lines: Vec<&str> = body_text.lines().collect();
687 let verification_code = lines
688 .iter()
689 .enumerate()
690 .find(|(_, line): &(usize, &&str)| {
691 line.contains("verification code is:") || line.contains("code is:")
692 })
693 .and_then(|(i, _)| lines.get(i + 1).map(|s: &&str| s.trim().to_string()))
694 .or_else(|| {
695 body_text
696 .split_whitespace()
697 .find(|word: &&str| {
698 word.contains('-') && word.chars().filter(|c| *c == '-').count() >= 3
699 })
700 .map(|s: &str| s.to_string())
701 })
702 .unwrap_or_else(|| body_text.clone());
703
704 let confirm_payload = json!({
705 "did": did,
706 "verificationCode": verification_code
707 });
708 let confirm_res = client
709 .post(format!(
710 "{}/xrpc/com.atproto.server.confirmSignup",
711 base_url().await
712 ))
713 .json(&confirm_payload)
714 .send()
715 .await
716 .expect("confirmSignup request failed");
717 if confirm_res.status() == StatusCode::OK {
718 let confirm_body: Value = confirm_res
719 .json()
720 .await
721 .expect("Invalid JSON from confirmSignup");
722 let access_jwt = confirm_body["accessJwt"]
723 .as_str()
724 .expect("No accessJwt in confirmSignup response")
725 .to_string();
726 return (access_jwt, did);
727 }
728 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await);
729 continue;
730 }
731 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
732 }
733 panic!("Failed to create account after 3 attempts: {}", last_error);
734}