this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use chrono::Utc; 5use reqwest::{Client, StatusCode, header}; 6use serde_json::{Value, json}; 7use sqlx::postgres::PgPoolOptions; 8use std::collections::HashMap; 9use std::sync::{Arc, OnceLock, RwLock}; 10#[allow(unused_imports)] 11use std::time::Duration; 12use tokio::net::TcpListener; 13use tranquil_pds::state::AppState; 14use wiremock::matchers::{method, path}; 15use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; 16 17static SERVER_URL: OnceLock<String> = OnceLock::new(); 18static APP_PORT: OnceLock<u16> = OnceLock::new(); 19static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 20static MOCK_PLC: OnceLock<MockServer> = OnceLock::new(); 21static TEST_DB_POOL: OnceLock<sqlx::PgPool> = OnceLock::new(); 22 23#[cfg(not(feature = "external-infra"))] 24use testcontainers::core::ContainerPort; 25#[cfg(not(feature = "external-infra"))] 26use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 27#[cfg(not(feature = "external-infra"))] 28use testcontainers_modules::postgres::Postgres; 29#[cfg(not(feature = "external-infra"))] 30static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 31#[cfg(not(feature = "external-infra"))] 32static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 33 34#[allow(dead_code)] 35pub const AUTH_TOKEN: &str = "test-token"; 36#[allow(dead_code)] 37pub const BAD_AUTH_TOKEN: &str = "bad-token"; 38#[allow(dead_code)] 39pub const AUTH_DID: &str = "did:plc:fake"; 40#[allow(dead_code)] 41pub const TARGET_DID: &str = "did:plc:target"; 42 43fn has_external_infra() -> bool { 44 std::env::var("TRANQUIL_PDS_TEST_INFRA_READY").is_ok() 45 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok()) 46} 47#[cfg(test)] 48#[ctor::dtor] 49fn cleanup() { 50 if has_external_infra() { 51 return; 52 } 53 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 54 let _ = std::process::Command::new("podman") 55 .args(["rm", "-f", "--filter", "label=tranquil_pds_test=true"]) 56 .output(); 57 } 58 let _ = std::process::Command::new("docker") 59 .args([ 60 "container", 61 "prune", 62 "-f", 63 "--filter", 64 "label=tranquil_pds_test=true", 65 ]) 66 .output(); 67} 68 69#[allow(dead_code)] 70pub fn client() -> Client { 71 Client::new() 72} 73 74#[allow(dead_code)] 75pub fn app_port() -> u16 { 76 *APP_PORT.get().expect("APP_PORT not initialized") 77} 78 79pub async fn base_url() -> &'static str { 80 SERVER_URL.get_or_init(|| { 81 let (tx, rx) = std::sync::mpsc::channel(); 82 std::thread::spawn(move || { 83 unsafe { 84 std::env::set_var("TRANQUIL_PDS_ALLOW_INSECURE_SECRETS", "1"); 85 } 86 if std::env::var("DOCKER_HOST").is_err() 87 && let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") 88 { 89 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 90 if podman_sock.exists() { 91 unsafe { 92 std::env::set_var( 93 "DOCKER_HOST", 94 format!("unix://{}", podman_sock.display()), 95 ); 96 } 97 } 98 } 99 let rt = tokio::runtime::Runtime::new().unwrap(); 100 rt.block_on(async move { 101 if has_external_infra() { 102 let url = setup_with_external_infra().await; 103 tx.send(url).unwrap(); 104 } else { 105 let url = setup_with_testcontainers().await; 106 tx.send(url).unwrap(); 107 } 108 std::future::pending::<()>().await; 109 }); 110 }); 111 rx.recv().expect("Failed to start test server") 112 }) 113} 114 115async fn setup_with_external_infra() -> String { 116 let database_url = 117 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra"); 118 let s3_endpoint = 119 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra"); 120 let plc_url = setup_mock_plc_directory().await; 121 unsafe { 122 std::env::set_var( 123 "S3_BUCKET", 124 std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()), 125 ); 126 std::env::set_var( 127 "AWS_ACCESS_KEY_ID", 128 std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()), 129 ); 130 std::env::set_var( 131 "AWS_SECRET_ACCESS_KEY", 132 std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()), 133 ); 134 std::env::set_var( 135 "AWS_REGION", 136 std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()), 137 ); 138 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 139 std::env::set_var("MAX_IMPORT_SIZE", "100000000"); 140 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 141 std::env::set_var("PLC_DIRECTORY_URL", &plc_url); 142 } 143 let mock_server = MockServer::start().await; 144 setup_mock_appview(&mock_server).await; 145 let mock_uri = mock_server.uri(); 146 let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri); 147 let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A")); 148 setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await; 149 MOCK_APPVIEW.set(mock_server).ok(); 150 spawn_app(database_url).await 151} 152 153#[cfg(not(feature = "external-infra"))] 154async fn setup_with_testcontainers() -> String { 155 let s3_container = GenericImage::new("minio/minio", "latest") 156 .with_exposed_port(ContainerPort::Tcp(9000)) 157 .with_env_var("MINIO_ROOT_USER", "minioadmin") 158 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 159 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 160 .with_label("tranquil_pds_test", "true") 161 .start() 162 .await 163 .expect("Failed to start MinIO"); 164 let s3_port = s3_container 165 .get_host_port_ipv4(9000) 166 .await 167 .expect("Failed to get S3 port"); 168 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 169 let plc_url = setup_mock_plc_directory().await; 170 unsafe { 171 std::env::set_var("S3_BUCKET", "test-bucket"); 172 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 173 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 174 std::env::set_var("AWS_REGION", "us-east-1"); 175 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 176 std::env::set_var("MAX_IMPORT_SIZE", "100000000"); 177 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 178 std::env::set_var("PLC_DIRECTORY_URL", &plc_url); 179 } 180 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 181 .region("us-east-1") 182 .endpoint_url(&s3_endpoint) 183 .credentials_provider(Credentials::new( 184 "minioadmin", 185 "minioadmin", 186 None, 187 None, 188 "test", 189 )) 190 .load() 191 .await; 192 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 193 .force_path_style(true) 194 .build(); 195 let s3_client = S3Client::from_conf(s3_config); 196 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 197 let mock_server = MockServer::start().await; 198 setup_mock_appview(&mock_server).await; 199 let mock_uri = mock_server.uri(); 200 let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri); 201 let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A")); 202 setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await; 203 MOCK_APPVIEW.set(mock_server).ok(); 204 S3_CONTAINER.set(s3_container).ok(); 205 let container = Postgres::default() 206 .with_tag("18-alpine") 207 .with_label("tranquil_pds_test", "true") 208 .start() 209 .await 210 .expect("Failed to start Postgres"); 211 let connection_string = format!( 212 "postgres://postgres:postgres@127.0.0.1:{}", 213 container 214 .get_host_port_ipv4(5432) 215 .await 216 .expect("Failed to get port") 217 ); 218 DB_CONTAINER.set(container).ok(); 219 spawn_app(connection_string).await 220} 221 222#[cfg(feature = "external-infra")] 223async fn setup_with_testcontainers() -> String { 224 panic!( 225 "Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT." 226 ); 227} 228 229async fn setup_mock_did_document(mock_server: &MockServer, did: &str, service_endpoint: &str) { 230 Mock::given(method("GET")) 231 .and(path("/.well-known/did.json")) 232 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 233 "id": did, 234 "service": [{ 235 "id": "#atproto_appview", 236 "type": "AtprotoAppView", 237 "serviceEndpoint": service_endpoint 238 }] 239 }))) 240 .mount(mock_server) 241 .await; 242} 243 244async fn setup_mock_appview(_mock_server: &MockServer) {} 245 246type PlcOperationStore = Arc<RwLock<HashMap<String, Value>>>; 247 248struct PlcPostResponder { 249 store: PlcOperationStore, 250} 251 252impl Respond for PlcPostResponder { 253 fn respond(&self, request: &Request) -> ResponseTemplate { 254 let path = request.url.path(); 255 let did = urlencoding::decode(path.trim_start_matches('/')) 256 .unwrap_or_default() 257 .to_string(); 258 259 if let Ok(body) = serde_json::from_slice::<Value>(request.body.as_slice()) 260 && let Ok(mut store) = self.store.write() 261 { 262 store.insert(did, body); 263 } 264 ResponseTemplate::new(200) 265 } 266} 267 268struct PlcGetResponder { 269 store: PlcOperationStore, 270} 271 272impl Respond for PlcGetResponder { 273 fn respond(&self, request: &Request) -> ResponseTemplate { 274 let path = request.url.path(); 275 let path_clean = path.trim_start_matches('/'); 276 277 let (did, endpoint) = path_clean 278 .find("/log/") 279 .or_else(|| path_clean.find("/data")) 280 .map(|idx| { 281 let did = urlencoding::decode(&path_clean[..idx]) 282 .unwrap_or_default() 283 .to_string(); 284 let endpoint = &path_clean[idx..]; 285 (did, endpoint) 286 }) 287 .unwrap_or_else(|| { 288 ( 289 urlencoding::decode(path_clean) 290 .unwrap_or_default() 291 .to_string(), 292 "", 293 ) 294 }); 295 296 let store = self.store.read().unwrap(); 297 let operation = store.get(&did); 298 299 match endpoint { 300 "/log/last" => { 301 let response = operation.cloned().unwrap_or_else(|| { 302 json!({ 303 "type": "plc_operation", 304 "rotationKeys": [], 305 "verificationMethods": {}, 306 "alsoKnownAs": [], 307 "services": {}, 308 "prev": null 309 }) 310 }); 311 ResponseTemplate::new(200).set_body_json(response) 312 } 313 "/log/audit" => ResponseTemplate::new(200).set_body_json(json!([])), 314 "/data" => { 315 let response = operation 316 .map(|op| { 317 json!({ 318 "rotationKeys": op.get("rotationKeys").cloned().unwrap_or(json!([])), 319 "verificationMethods": op.get("verificationMethods").cloned().unwrap_or(json!({})), 320 "alsoKnownAs": op.get("alsoKnownAs").cloned().unwrap_or(json!([])), 321 "services": op.get("services").cloned().unwrap_or(json!({})) 322 }) 323 }) 324 .unwrap_or_else(|| { 325 json!({ 326 "rotationKeys": [], 327 "verificationMethods": {}, 328 "alsoKnownAs": [], 329 "services": {} 330 }) 331 }); 332 ResponseTemplate::new(200).set_body_json(response) 333 } 334 _ => { 335 let did_doc = operation 336 .map(|op| operation_to_did_document(&did, op)) 337 .unwrap_or_else(|| { 338 json!({ 339 "@context": ["https://www.w3.org/ns/did/v1"], 340 "id": did, 341 "alsoKnownAs": [], 342 "verificationMethod": [], 343 "service": [] 344 }) 345 }); 346 ResponseTemplate::new(200).set_body_json(did_doc) 347 } 348 } 349 } 350} 351 352fn operation_to_did_document(did: &str, op: &Value) -> Value { 353 let also_known_as = op 354 .get("alsoKnownAs") 355 .and_then(|v| v.as_array()) 356 .cloned() 357 .unwrap_or_default(); 358 359 let verification_methods: Vec<Value> = op 360 .get("verificationMethods") 361 .and_then(|v| v.as_object()) 362 .map(|methods| { 363 methods 364 .iter() 365 .map(|(key, value)| { 366 let did_key = value.as_str().unwrap_or(""); 367 let multikey = did_key_to_multikey(did_key); 368 json!({ 369 "id": format!("{}#{}", did, key), 370 "type": "Multikey", 371 "controller": did, 372 "publicKeyMultibase": multikey 373 }) 374 }) 375 .collect() 376 }) 377 .unwrap_or_default(); 378 379 let services: Vec<Value> = op 380 .get("services") 381 .and_then(|v| v.as_object()) 382 .map(|svcs| { 383 svcs.iter() 384 .map(|(key, value)| { 385 json!({ 386 "id": format!("#{}", key), 387 "type": value.get("type").and_then(|t| t.as_str()).unwrap_or(""), 388 "serviceEndpoint": value.get("endpoint").and_then(|e| e.as_str()).unwrap_or("") 389 }) 390 }) 391 .collect() 392 }) 393 .unwrap_or_default(); 394 395 json!({ 396 "@context": [ 397 "https://www.w3.org/ns/did/v1", 398 "https://w3id.org/security/multikey/v1" 399 ], 400 "id": did, 401 "alsoKnownAs": also_known_as, 402 "verificationMethod": verification_methods, 403 "service": services 404 }) 405} 406 407fn did_key_to_multikey(did_key: &str) -> String { 408 if !did_key.starts_with("did:key:z") { 409 return String::new(); 410 } 411 did_key[8..].to_string() 412} 413 414async fn setup_mock_plc_directory() -> String { 415 let mock_plc = MockServer::start().await; 416 let store: PlcOperationStore = Arc::new(RwLock::new(HashMap::new())); 417 418 Mock::given(method("POST")) 419 .respond_with(PlcPostResponder { 420 store: store.clone(), 421 }) 422 .mount(&mock_plc) 423 .await; 424 425 Mock::given(method("GET")) 426 .respond_with(PlcGetResponder { 427 store: store.clone(), 428 }) 429 .mount(&mock_plc) 430 .await; 431 432 let plc_url = mock_plc.uri(); 433 MOCK_PLC.set(mock_plc).ok(); 434 plc_url 435} 436 437async fn spawn_app(database_url: String) -> String { 438 use tranquil_pds::rate_limit::RateLimiters; 439 let pool = PgPoolOptions::new() 440 .max_connections(10) 441 .acquire_timeout(std::time::Duration::from_secs(30)) 442 .connect(&database_url) 443 .await 444 .expect("Failed to connect to Postgres. Make sure the database is running."); 445 sqlx::migrate!("./migrations") 446 .run(&pool) 447 .await 448 .expect("Failed to run migrations"); 449 let test_pool = PgPoolOptions::new() 450 .max_connections(5) 451 .acquire_timeout(std::time::Duration::from_secs(30)) 452 .connect(&database_url) 453 .await 454 .expect("Failed to create test pool"); 455 TEST_DB_POOL.set(test_pool).ok(); 456 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 457 let addr = listener.local_addr().unwrap(); 458 APP_PORT.set(addr.port()).ok(); 459 unsafe { 460 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 461 } 462 let rate_limiters = RateLimiters::new() 463 .with_login_limit(10000) 464 .with_account_creation_limit(10000) 465 .with_password_reset_limit(10000) 466 .with_email_update_limit(10000) 467 .with_oauth_authorize_limit(10000) 468 .with_oauth_token_limit(10000); 469 let state = AppState::from_db(pool) 470 .await 471 .with_rate_limiters(rate_limiters); 472 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 473 let app = tranquil_pds::app(state); 474 tokio::spawn(async move { 475 axum::serve(listener, app).await.unwrap(); 476 }); 477 format!("http://{}", addr) 478} 479 480#[allow(dead_code)] 481pub async fn get_db_connection_string() -> String { 482 base_url().await; 483 if has_external_infra() { 484 std::env::var("DATABASE_URL").expect("DATABASE_URL not set") 485 } else { 486 #[cfg(not(feature = "external-infra"))] 487 { 488 let container = DB_CONTAINER.get().expect("DB container not initialized"); 489 let port = container 490 .get_host_port_ipv4(5432) 491 .await 492 .expect("Failed to get port"); 493 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 494 } 495 #[cfg(feature = "external-infra")] 496 { 497 panic!("DATABASE_URL must be set with external-infra feature"); 498 } 499 } 500} 501 502#[allow(dead_code)] 503pub async fn get_test_db_pool() -> &'static sqlx::PgPool { 504 base_url().await; 505 TEST_DB_POOL.get().expect("TEST_DB_POOL not initialized") 506} 507 508#[allow(dead_code)] 509pub async fn verify_new_account(client: &Client, did: &str) -> String { 510 let pool = get_test_db_pool().await; 511 let body_text: String = sqlx::query_scalar!( 512 "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1", 513 did 514 ) 515 .fetch_one(pool) 516 .await 517 .expect("Failed to get verification code"); 518 519 let lines: Vec<&str> = body_text.lines().collect(); 520 let verification_code = lines 521 .iter() 522 .enumerate() 523 .find(|(_, line)| line.contains("verification code is:") || line.contains("code is:")) 524 .and_then(|(i, _)| lines.get(i + 1).map(|s| s.trim().to_string())) 525 .or_else(|| { 526 body_text 527 .split_whitespace() 528 .find(|word| word.contains('-') && word.chars().filter(|c| *c == '-').count() >= 3) 529 .map(|s| s.to_string()) 530 }) 531 .unwrap_or_else(|| body_text.clone()); 532 533 let confirm_payload = json!({ 534 "did": did, 535 "verificationCode": verification_code 536 }); 537 let confirm_res = client 538 .post(format!( 539 "{}/xrpc/com.atproto.server.confirmSignup", 540 base_url().await 541 )) 542 .json(&confirm_payload) 543 .send() 544 .await 545 .expect("confirmSignup request failed"); 546 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed"); 547 let confirm_body: Value = confirm_res 548 .json() 549 .await 550 .expect("Invalid JSON from confirmSignup"); 551 confirm_body["accessJwt"] 552 .as_str() 553 .expect("No accessJwt in confirmSignup response") 554 .to_string() 555} 556 557#[allow(dead_code)] 558pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 559 let res = client 560 .post(format!( 561 "{}/xrpc/com.atproto.repo.uploadBlob", 562 base_url().await 563 )) 564 .header(header::CONTENT_TYPE, mime) 565 .bearer_auth(AUTH_TOKEN) 566 .body(data) 567 .send() 568 .await 569 .expect("Failed to send uploadBlob request"); 570 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 571 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 572 body["blob"].clone() 573} 574 575#[allow(dead_code)] 576pub async fn create_test_post( 577 client: &Client, 578 text: &str, 579 reply_to: Option<Value>, 580) -> (String, String, String) { 581 let collection = "app.bsky.feed.post"; 582 let mut record = json!({ 583 "$type": collection, 584 "text": text, 585 "createdAt": Utc::now().to_rfc3339() 586 }); 587 if let Some(reply_obj) = reply_to { 588 record["reply"] = reply_obj; 589 } 590 let payload = json!({ 591 "repo": AUTH_DID, 592 "collection": collection, 593 "record": record 594 }); 595 let res = client 596 .post(format!( 597 "{}/xrpc/com.atproto.repo.createRecord", 598 base_url().await 599 )) 600 .bearer_auth(AUTH_TOKEN) 601 .json(&payload) 602 .send() 603 .await 604 .expect("Failed to send createRecord"); 605 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 606 let body: Value = res 607 .json() 608 .await 609 .expect("createRecord response was not JSON"); 610 let uri = body["uri"] 611 .as_str() 612 .expect("Response had no URI") 613 .to_string(); 614 let cid = body["cid"] 615 .as_str() 616 .expect("Response had no CID") 617 .to_string(); 618 let rkey = uri 619 .split('/') 620 .next_back() 621 .expect("URI was malformed") 622 .to_string(); 623 (uri, cid, rkey) 624} 625 626#[allow(dead_code)] 627pub async fn create_account_and_login(client: &Client) -> (String, String) { 628 create_account_and_login_internal(client, false).await 629} 630 631#[allow(dead_code)] 632pub async fn create_admin_account_and_login(client: &Client) -> (String, String) { 633 create_account_and_login_internal(client, true).await 634} 635 636async fn create_account_and_login_internal(client: &Client, make_admin: bool) -> (String, String) { 637 let mut last_error = String::new(); 638 for attempt in 0..3 { 639 if attempt > 0 { 640 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 641 } 642 let handle = format!("u{}", &uuid::Uuid::new_v4().simple().to_string()[..12]); 643 let payload = json!({ 644 "handle": handle, 645 "email": format!("{}@example.com", handle), 646 "password": "Testpass123!" 647 }); 648 let res = match client 649 .post(format!( 650 "{}/xrpc/com.atproto.server.createAccount", 651 base_url().await 652 )) 653 .json(&payload) 654 .send() 655 .await 656 { 657 Ok(r) => r, 658 Err(e) => { 659 last_error = format!("Request failed: {}", e); 660 continue; 661 } 662 }; 663 if res.status() == StatusCode::OK { 664 let body: Value = res.json().await.expect("Invalid JSON"); 665 let did = body["did"].as_str().expect("No did").to_string(); 666 let pool = get_test_db_pool().await; 667 if make_admin { 668 sqlx::query!("UPDATE users SET is_admin = TRUE WHERE did = $1", &did) 669 .execute(pool) 670 .await 671 .expect("Failed to mark user as admin"); 672 } 673 let verification_required = body["verificationRequired"].as_bool().unwrap_or(true); 674 if let Some(access_jwt) = body["accessJwt"].as_str() 675 && !verification_required 676 { 677 return (access_jwt.to_string(), did); 678 } 679 let body_text: String = sqlx::query_scalar!( 680 "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1", 681 &did 682 ) 683 .fetch_one(pool) 684 .await 685 .expect("Failed to get verification from comms_queue"); 686 let lines: Vec<&str> = body_text.lines().collect(); 687 let verification_code = lines 688 .iter() 689 .enumerate() 690 .find(|(_, line): &(usize, &&str)| { 691 line.contains("verification code is:") || line.contains("code is:") 692 }) 693 .and_then(|(i, _)| lines.get(i + 1).map(|s: &&str| s.trim().to_string())) 694 .or_else(|| { 695 body_text 696 .split_whitespace() 697 .find(|word: &&str| { 698 word.contains('-') && word.chars().filter(|c| *c == '-').count() >= 3 699 }) 700 .map(|s: &str| s.to_string()) 701 }) 702 .unwrap_or_else(|| body_text.clone()); 703 704 let confirm_payload = json!({ 705 "did": did, 706 "verificationCode": verification_code 707 }); 708 let confirm_res = client 709 .post(format!( 710 "{}/xrpc/com.atproto.server.confirmSignup", 711 base_url().await 712 )) 713 .json(&confirm_payload) 714 .send() 715 .await 716 .expect("confirmSignup request failed"); 717 if confirm_res.status() == StatusCode::OK { 718 let confirm_body: Value = confirm_res 719 .json() 720 .await 721 .expect("Invalid JSON from confirmSignup"); 722 let access_jwt = confirm_body["accessJwt"] 723 .as_str() 724 .expect("No accessJwt in confirmSignup response") 725 .to_string(); 726 return (access_jwt, did); 727 } 728 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await); 729 continue; 730 } 731 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 732 } 733 panic!("Failed to create account after 3 attempts: {}", last_error); 734}