this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use bspds::state::AppState; 5use chrono::Utc; 6use reqwest::{Client, StatusCode, header}; 7use serde_json::{Value, json}; 8use sqlx::postgres::PgPoolOptions; 9#[allow(unused_imports)] 10use std::collections::HashMap; 11use std::sync::OnceLock; 12#[allow(unused_imports)] 13use std::time::Duration; 14use tokio::net::TcpListener; 15use wiremock::matchers::{method, path}; 16use wiremock::{Mock, MockServer, ResponseTemplate}; 17 18static SERVER_URL: OnceLock<String> = OnceLock::new(); 19static APP_PORT: OnceLock<u16> = OnceLock::new(); 20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 21 22#[cfg(not(feature = "external-infra"))] 23use testcontainers::core::ContainerPort; 24#[cfg(not(feature = "external-infra"))] 25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 26#[cfg(not(feature = "external-infra"))] 27use testcontainers_modules::postgres::Postgres; 28 29#[cfg(not(feature = "external-infra"))] 30static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 31#[cfg(not(feature = "external-infra"))] 32static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 33 34#[allow(dead_code)] 35pub const AUTH_TOKEN: &str = "test-token"; 36#[allow(dead_code)] 37pub const BAD_AUTH_TOKEN: &str = "bad-token"; 38#[allow(dead_code)] 39pub const AUTH_DID: &str = "did:plc:fake"; 40#[allow(dead_code)] 41pub const TARGET_DID: &str = "did:plc:target"; 42 43fn has_external_infra() -> bool { 44 std::env::var("BSPDS_TEST_INFRA_READY").is_ok() 45 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok()) 46} 47 48#[cfg(test)] 49#[ctor::dtor] 50fn cleanup() { 51 if has_external_infra() { 52 return; 53 } 54 55 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 56 let _ = std::process::Command::new("podman") 57 .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 58 .output(); 59 } 60 61 let _ = std::process::Command::new("docker") 62 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"]) 63 .output(); 64} 65 66#[allow(dead_code)] 67pub fn client() -> Client { 68 Client::new() 69} 70 71#[allow(dead_code)] 72pub fn app_port() -> u16 { 73 *APP_PORT.get().expect("APP_PORT not initialized") 74} 75 76pub async fn base_url() -> &'static str { 77 SERVER_URL.get_or_init(|| { 78 let (tx, rx) = std::sync::mpsc::channel(); 79 80 std::thread::spawn(move || { 81 if std::env::var("DOCKER_HOST").is_err() { 82 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") { 83 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 84 if podman_sock.exists() { 85 unsafe { 86 std::env::set_var( 87 "DOCKER_HOST", 88 format!("unix://{}", podman_sock.display()), 89 ); 90 } 91 } 92 } 93 } 94 95 let rt = tokio::runtime::Runtime::new().unwrap(); 96 rt.block_on(async move { 97 if has_external_infra() { 98 let url = setup_with_external_infra().await; 99 tx.send(url).unwrap(); 100 } else { 101 let url = setup_with_testcontainers().await; 102 tx.send(url).unwrap(); 103 } 104 std::future::pending::<()>().await; 105 }); 106 }); 107 108 rx.recv().expect("Failed to start test server") 109 }) 110} 111 112async fn setup_with_external_infra() -> String { 113 let database_url = std::env::var("DATABASE_URL") 114 .expect("DATABASE_URL must be set when using external infra"); 115 let s3_endpoint = std::env::var("S3_ENDPOINT") 116 .expect("S3_ENDPOINT must be set when using external infra"); 117 118 unsafe { 119 std::env::set_var("S3_BUCKET", std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string())); 120 std::env::set_var("AWS_ACCESS_KEY_ID", std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string())); 121 std::env::set_var("AWS_SECRET_ACCESS_KEY", std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string())); 122 std::env::set_var("AWS_REGION", std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string())); 123 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 124 } 125 126 let mock_server = MockServer::start().await; 127 setup_mock_appview(&mock_server).await; 128 129 unsafe { 130 std::env::set_var("APPVIEW_URL", mock_server.uri()); 131 } 132 MOCK_APPVIEW.set(mock_server).ok(); 133 134 spawn_app(database_url).await 135} 136 137#[cfg(not(feature = "external-infra"))] 138async fn setup_with_testcontainers() -> String { 139 let s3_container = GenericImage::new("minio/minio", "latest") 140 .with_exposed_port(ContainerPort::Tcp(9000)) 141 .with_env_var("MINIO_ROOT_USER", "minioadmin") 142 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 143 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 144 .with_label("bspds_test", "true") 145 .start() 146 .await 147 .expect("Failed to start MinIO"); 148 149 let s3_port = s3_container 150 .get_host_port_ipv4(9000) 151 .await 152 .expect("Failed to get S3 port"); 153 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 154 155 unsafe { 156 std::env::set_var("S3_BUCKET", "test-bucket"); 157 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 158 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 159 std::env::set_var("AWS_REGION", "us-east-1"); 160 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 161 } 162 163 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 164 .region("us-east-1") 165 .endpoint_url(&s3_endpoint) 166 .credentials_provider(Credentials::new( 167 "minioadmin", 168 "minioadmin", 169 None, 170 None, 171 "test", 172 )) 173 .load() 174 .await; 175 176 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 177 .force_path_style(true) 178 .build(); 179 let s3_client = S3Client::from_conf(s3_config); 180 181 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 182 183 let mock_server = MockServer::start().await; 184 setup_mock_appview(&mock_server).await; 185 186 unsafe { 187 std::env::set_var("APPVIEW_URL", mock_server.uri()); 188 } 189 MOCK_APPVIEW.set(mock_server).ok(); 190 191 S3_CONTAINER.set(s3_container).ok(); 192 193 let container = Postgres::default() 194 .with_tag("18-alpine") 195 .with_label("bspds_test", "true") 196 .start() 197 .await 198 .expect("Failed to start Postgres"); 199 let connection_string = format!( 200 "postgres://postgres:postgres@127.0.0.1:{}", 201 container 202 .get_host_port_ipv4(5432) 203 .await 204 .expect("Failed to get port") 205 ); 206 207 DB_CONTAINER.set(container).ok(); 208 209 spawn_app(connection_string).await 210} 211 212#[cfg(feature = "external-infra")] 213async fn setup_with_testcontainers() -> String { 214 panic!("Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT."); 215} 216 217async fn setup_mock_appview(mock_server: &MockServer) { 218 Mock::given(method("GET")) 219 .and(path("/xrpc/app.bsky.actor.getProfile")) 220 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 221 "handle": "mock.handle", 222 "did": "did:plc:mock", 223 "displayName": "Mock User" 224 }))) 225 .mount(mock_server) 226 .await; 227 228 Mock::given(method("GET")) 229 .and(path("/xrpc/app.bsky.actor.searchActors")) 230 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 231 "actors": [], 232 "cursor": null 233 }))) 234 .mount(mock_server) 235 .await; 236 237 Mock::given(method("GET")) 238 .and(path("/xrpc/app.bsky.feed.getTimeline")) 239 .respond_with( 240 ResponseTemplate::new(200) 241 .insert_header("atproto-repo-rev", "0") 242 .set_body_json(json!({ 243 "feed": [], 244 "cursor": null 245 })) 246 ) 247 .mount(mock_server) 248 .await; 249 250 Mock::given(method("GET")) 251 .and(path("/xrpc/app.bsky.feed.getAuthorFeed")) 252 .respond_with( 253 ResponseTemplate::new(200) 254 .insert_header("atproto-repo-rev", "0") 255 .set_body_json(json!({ 256 "feed": [{ 257 "post": { 258 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author", 259 "cid": "bafyappview123", 260 "author": {"did": "did:plc:mock-author", "handle": "mock.author"}, 261 "record": { 262 "$type": "app.bsky.feed.post", 263 "text": "Author feed post from appview", 264 "createdAt": "2025-01-01T00:00:00Z" 265 }, 266 "indexedAt": "2025-01-01T00:00:00Z" 267 } 268 }], 269 "cursor": "author-cursor" 270 })), 271 ) 272 .mount(mock_server) 273 .await; 274 275 Mock::given(method("GET")) 276 .and(path("/xrpc/app.bsky.feed.getActorLikes")) 277 .respond_with( 278 ResponseTemplate::new(200) 279 .insert_header("atproto-repo-rev", "0") 280 .set_body_json(json!({ 281 "feed": [{ 282 "post": { 283 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post", 284 "cid": "bafyliked123", 285 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"}, 286 "record": { 287 "$type": "app.bsky.feed.post", 288 "text": "Liked post from appview", 289 "createdAt": "2025-01-01T00:00:00Z" 290 }, 291 "indexedAt": "2025-01-01T00:00:00Z" 292 } 293 }], 294 "cursor": null 295 })), 296 ) 297 .mount(mock_server) 298 .await; 299 300 Mock::given(method("GET")) 301 .and(path("/xrpc/app.bsky.feed.getPostThread")) 302 .respond_with( 303 ResponseTemplate::new(200) 304 .insert_header("atproto-repo-rev", "0") 305 .set_body_json(json!({ 306 "thread": { 307 "$type": "app.bsky.feed.defs#threadViewPost", 308 "post": { 309 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post", 310 "cid": "bafythread123", 311 "author": {"did": "did:plc:mock", "handle": "mock.handle"}, 312 "record": { 313 "$type": "app.bsky.feed.post", 314 "text": "Thread post from appview", 315 "createdAt": "2025-01-01T00:00:00Z" 316 }, 317 "indexedAt": "2025-01-01T00:00:00Z" 318 }, 319 "replies": [] 320 } 321 })), 322 ) 323 .mount(mock_server) 324 .await; 325 326 Mock::given(method("GET")) 327 .and(path("/xrpc/app.bsky.feed.getFeed")) 328 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 329 "feed": [{ 330 "post": { 331 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post", 332 "cid": "bafyfeed123", 333 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"}, 334 "record": { 335 "$type": "app.bsky.feed.post", 336 "text": "Custom feed post from appview", 337 "createdAt": "2025-01-01T00:00:00Z" 338 }, 339 "indexedAt": "2025-01-01T00:00:00Z" 340 } 341 }], 342 "cursor": null 343 }))) 344 .mount(mock_server) 345 .await; 346 347 Mock::given(method("POST")) 348 .and(path("/xrpc/app.bsky.notification.registerPush")) 349 .respond_with(ResponseTemplate::new(200)) 350 .mount(mock_server) 351 .await; 352} 353 354async fn spawn_app(database_url: String) -> String { 355 use bspds::rate_limit::RateLimiters; 356 357 let pool = PgPoolOptions::new() 358 .max_connections(50) 359 .connect(&database_url) 360 .await 361 .expect("Failed to connect to Postgres. Make sure the database is running."); 362 363 sqlx::migrate!("./migrations") 364 .run(&pool) 365 .await 366 .expect("Failed to run migrations"); 367 368 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 369 let addr = listener.local_addr().unwrap(); 370 APP_PORT.set(addr.port()).ok(); 371 372 unsafe { 373 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 374 } 375 376 let rate_limiters = RateLimiters::new() 377 .with_login_limit(10000) 378 .with_account_creation_limit(10000) 379 .with_password_reset_limit(10000) 380 .with_email_update_limit(10000) 381 .with_oauth_authorize_limit(10000) 382 .with_oauth_token_limit(10000); 383 384 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters); 385 386 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 387 388 let app = bspds::app(state); 389 390 tokio::spawn(async move { 391 axum::serve(listener, app).await.unwrap(); 392 }); 393 394 format!("http://{}", addr) 395} 396 397#[allow(dead_code)] 398pub async fn get_db_connection_string() -> String { 399 base_url().await; 400 401 if has_external_infra() { 402 std::env::var("DATABASE_URL").expect("DATABASE_URL not set") 403 } else { 404 #[cfg(not(feature = "external-infra"))] 405 { 406 let container = DB_CONTAINER.get().expect("DB container not initialized"); 407 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port"); 408 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 409 } 410 #[cfg(feature = "external-infra")] 411 { 412 panic!("DATABASE_URL must be set with external-infra feature"); 413 } 414 } 415} 416 417#[allow(dead_code)] 418pub async fn verify_new_account(client: &Client, did: &str) -> String { 419 let conn_str = get_db_connection_string().await; 420 let pool = sqlx::postgres::PgPoolOptions::new() 421 .max_connections(2) 422 .connect(&conn_str) 423 .await 424 .expect("Failed to connect to test database"); 425 426 let verification_code: String = sqlx::query_scalar!( 427 "SELECT email_confirmation_code FROM users WHERE did = $1", 428 did 429 ) 430 .fetch_one(&pool) 431 .await 432 .expect("Failed to get verification code") 433 .expect("No verification code found"); 434 435 let confirm_payload = json!({ 436 "did": did, 437 "verificationCode": verification_code 438 }); 439 440 let confirm_res = client 441 .post(format!( 442 "{}/xrpc/com.atproto.server.confirmSignup", 443 base_url().await 444 )) 445 .json(&confirm_payload) 446 .send() 447 .await 448 .expect("confirmSignup request failed"); 449 450 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed"); 451 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup"); 452 confirm_body["accessJwt"] 453 .as_str() 454 .expect("No accessJwt in confirmSignup response") 455 .to_string() 456} 457 458#[allow(dead_code)] 459pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 460 let res = client 461 .post(format!( 462 "{}/xrpc/com.atproto.repo.uploadBlob", 463 base_url().await 464 )) 465 .header(header::CONTENT_TYPE, mime) 466 .bearer_auth(AUTH_TOKEN) 467 .body(data) 468 .send() 469 .await 470 .expect("Failed to send uploadBlob request"); 471 472 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 473 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 474 body["blob"].clone() 475} 476 477#[allow(dead_code)] 478pub async fn create_test_post( 479 client: &Client, 480 text: &str, 481 reply_to: Option<Value>, 482) -> (String, String, String) { 483 let collection = "app.bsky.feed.post"; 484 let mut record = json!({ 485 "$type": collection, 486 "text": text, 487 "createdAt": Utc::now().to_rfc3339() 488 }); 489 490 if let Some(reply_obj) = reply_to { 491 record["reply"] = reply_obj; 492 } 493 494 let payload = json!({ 495 "repo": AUTH_DID, 496 "collection": collection, 497 "record": record 498 }); 499 500 let res = client 501 .post(format!( 502 "{}/xrpc/com.atproto.repo.createRecord", 503 base_url().await 504 )) 505 .bearer_auth(AUTH_TOKEN) 506 .json(&payload) 507 .send() 508 .await 509 .expect("Failed to send createRecord"); 510 511 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 512 let body: Value = res 513 .json() 514 .await 515 .expect("createRecord response was not JSON"); 516 517 let uri = body["uri"] 518 .as_str() 519 .expect("Response had no URI") 520 .to_string(); 521 let cid = body["cid"] 522 .as_str() 523 .expect("Response had no CID") 524 .to_string(); 525 let rkey = uri 526 .split('/') 527 .last() 528 .expect("URI was malformed") 529 .to_string(); 530 531 (uri, cid, rkey) 532} 533 534#[allow(dead_code)] 535pub async fn create_account_and_login(client: &Client) -> (String, String) { 536 let mut last_error = String::new(); 537 538 for attempt in 0..3 { 539 if attempt > 0 { 540 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 541 } 542 543 let handle = format!("user_{}", uuid::Uuid::new_v4()); 544 let payload = json!({ 545 "handle": handle, 546 "email": format!("{}@example.com", handle), 547 "password": "password" 548 }); 549 550 let res = match client 551 .post(format!( 552 "{}/xrpc/com.atproto.server.createAccount", 553 base_url().await 554 )) 555 .json(&payload) 556 .send() 557 .await 558 { 559 Ok(r) => r, 560 Err(e) => { 561 last_error = format!("Request failed: {}", e); 562 continue; 563 } 564 }; 565 566 if res.status() == StatusCode::OK { 567 let body: Value = res.json().await.expect("Invalid JSON"); 568 569 if let Some(access_jwt) = body["accessJwt"].as_str() { 570 let did = body["did"].as_str().expect("No did").to_string(); 571 return (access_jwt.to_string(), did); 572 } 573 574 let did = body["did"].as_str().expect("No did").to_string(); 575 576 let conn_str = get_db_connection_string().await; 577 let pool = sqlx::postgres::PgPoolOptions::new() 578 .max_connections(2) 579 .connect(&conn_str) 580 .await 581 .expect("Failed to connect to test database"); 582 583 let verification_code: String = sqlx::query_scalar!( 584 "SELECT email_confirmation_code FROM users WHERE did = $1", 585 &did 586 ) 587 .fetch_one(&pool) 588 .await 589 .expect("Failed to get verification code") 590 .expect("No verification code found"); 591 592 let confirm_payload = json!({ 593 "did": did, 594 "verificationCode": verification_code 595 }); 596 597 let confirm_res = client 598 .post(format!( 599 "{}/xrpc/com.atproto.server.confirmSignup", 600 base_url().await 601 )) 602 .json(&confirm_payload) 603 .send() 604 .await 605 .expect("confirmSignup request failed"); 606 607 if confirm_res.status() == StatusCode::OK { 608 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup"); 609 let access_jwt = confirm_body["accessJwt"] 610 .as_str() 611 .expect("No accessJwt in confirmSignup response") 612 .to_string(); 613 return (access_jwt, did); 614 } 615 616 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await); 617 continue; 618 } 619 620 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 621 } 622 623 panic!("Failed to create account after 3 attempts: {}", last_error); 624}