this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use bspds::state::AppState; 5use chrono::Utc; 6use reqwest::{Client, StatusCode, header}; 7use serde_json::{Value, json}; 8use sqlx::postgres::PgPoolOptions; 9#[allow(unused_imports)] 10use std::collections::HashMap; 11use std::sync::OnceLock; 12#[allow(unused_imports)] 13use std::time::Duration; 14use tokio::net::TcpListener; 15use wiremock::matchers::{method, path}; 16use wiremock::{Mock, MockServer, ResponseTemplate}; 17 18static SERVER_URL: OnceLock<String> = OnceLock::new(); 19static APP_PORT: OnceLock<u16> = OnceLock::new(); 20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 21 22#[cfg(not(feature = "external-infra"))] 23use testcontainers::core::ContainerPort; 24#[cfg(not(feature = "external-infra"))] 25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 26#[cfg(not(feature = "external-infra"))] 27use testcontainers_modules::postgres::Postgres; 28#[cfg(not(feature = "external-infra"))] 29static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 30#[cfg(not(feature = "external-infra"))] 31static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 32 33#[allow(dead_code)] 34pub const AUTH_TOKEN: &str = "test-token"; 35#[allow(dead_code)] 36pub const BAD_AUTH_TOKEN: &str = "bad-token"; 37#[allow(dead_code)] 38pub const AUTH_DID: &str = "did:plc:fake"; 39#[allow(dead_code)] 40pub const TARGET_DID: &str = "did:plc:target"; 41 42fn has_external_infra() -> bool { 43 std::env::var("BSPDS_TEST_INFRA_READY").is_ok() 44 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok()) 45} 46#[cfg(test)] 47#[ctor::dtor] 48fn cleanup() { 49 if has_external_infra() { 50 return; 51 } 52 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 53 let _ = std::process::Command::new("podman") 54 .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 55 .output(); 56 } 57 let _ = std::process::Command::new("docker") 58 .args(&[ 59 "container", 60 "prune", 61 "-f", 62 "--filter", 63 "label=bspds_test=true", 64 ]) 65 .output(); 66} 67 68#[allow(dead_code)] 69pub fn client() -> Client { 70 Client::new() 71} 72 73#[allow(dead_code)] 74pub fn app_port() -> u16 { 75 *APP_PORT.get().expect("APP_PORT not initialized") 76} 77 78pub async fn base_url() -> &'static str { 79 SERVER_URL.get_or_init(|| { 80 let (tx, rx) = std::sync::mpsc::channel(); 81 std::thread::spawn(move || { 82 if std::env::var("DOCKER_HOST").is_err() { 83 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") { 84 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 85 if podman_sock.exists() { 86 unsafe { 87 std::env::set_var( 88 "DOCKER_HOST", 89 format!("unix://{}", podman_sock.display()), 90 ); 91 } 92 } 93 } 94 } 95 let rt = tokio::runtime::Runtime::new().unwrap(); 96 rt.block_on(async move { 97 if has_external_infra() { 98 let url = setup_with_external_infra().await; 99 tx.send(url).unwrap(); 100 } else { 101 let url = setup_with_testcontainers().await; 102 tx.send(url).unwrap(); 103 } 104 std::future::pending::<()>().await; 105 }); 106 }); 107 rx.recv().expect("Failed to start test server") 108 }) 109} 110 111async fn setup_with_external_infra() -> String { 112 let database_url = 113 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra"); 114 let s3_endpoint = 115 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra"); 116 unsafe { 117 std::env::set_var( 118 "S3_BUCKET", 119 std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()), 120 ); 121 std::env::set_var( 122 "AWS_ACCESS_KEY_ID", 123 std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()), 124 ); 125 std::env::set_var( 126 "AWS_SECRET_ACCESS_KEY", 127 std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()), 128 ); 129 std::env::set_var( 130 "AWS_REGION", 131 std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()), 132 ); 133 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 134 } 135 let mock_server = MockServer::start().await; 136 setup_mock_appview(&mock_server).await; 137 unsafe { 138 std::env::set_var("APPVIEW_URL", mock_server.uri()); 139 } 140 MOCK_APPVIEW.set(mock_server).ok(); 141 spawn_app(database_url).await 142} 143 144#[cfg(not(feature = "external-infra"))] 145async fn setup_with_testcontainers() -> String { 146 let s3_container = GenericImage::new("minio/minio", "latest") 147 .with_exposed_port(ContainerPort::Tcp(9000)) 148 .with_env_var("MINIO_ROOT_USER", "minioadmin") 149 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 150 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 151 .with_label("bspds_test", "true") 152 .start() 153 .await 154 .expect("Failed to start MinIO"); 155 let s3_port = s3_container 156 .get_host_port_ipv4(9000) 157 .await 158 .expect("Failed to get S3 port"); 159 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 160 unsafe { 161 std::env::set_var("S3_BUCKET", "test-bucket"); 162 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 163 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 164 std::env::set_var("AWS_REGION", "us-east-1"); 165 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 166 } 167 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 168 .region("us-east-1") 169 .endpoint_url(&s3_endpoint) 170 .credentials_provider(Credentials::new( 171 "minioadmin", 172 "minioadmin", 173 None, 174 None, 175 "test", 176 )) 177 .load() 178 .await; 179 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 180 .force_path_style(true) 181 .build(); 182 let s3_client = S3Client::from_conf(s3_config); 183 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 184 let mock_server = MockServer::start().await; 185 setup_mock_appview(&mock_server).await; 186 unsafe { 187 std::env::set_var("APPVIEW_URL", mock_server.uri()); 188 } 189 MOCK_APPVIEW.set(mock_server).ok(); 190 S3_CONTAINER.set(s3_container).ok(); 191 let container = Postgres::default() 192 .with_tag("18-alpine") 193 .with_label("bspds_test", "true") 194 .start() 195 .await 196 .expect("Failed to start Postgres"); 197 let connection_string = format!( 198 "postgres://postgres:postgres@127.0.0.1:{}", 199 container 200 .get_host_port_ipv4(5432) 201 .await 202 .expect("Failed to get port") 203 ); 204 DB_CONTAINER.set(container).ok(); 205 spawn_app(connection_string).await 206} 207 208#[cfg(feature = "external-infra")] 209async fn setup_with_testcontainers() -> String { 210 panic!( 211 "Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT." 212 ); 213} 214 215async fn setup_mock_appview(mock_server: &MockServer) { 216 Mock::given(method("GET")) 217 .and(path("/xrpc/app.bsky.actor.getProfile")) 218 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 219 "handle": "mock.handle", 220 "did": "did:plc:mock", 221 "displayName": "Mock User" 222 }))) 223 .mount(mock_server) 224 .await; 225 Mock::given(method("GET")) 226 .and(path("/xrpc/app.bsky.actor.searchActors")) 227 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 228 "actors": [], 229 "cursor": null 230 }))) 231 .mount(mock_server) 232 .await; 233 Mock::given(method("GET")) 234 .and(path("/xrpc/app.bsky.feed.getTimeline")) 235 .respond_with( 236 ResponseTemplate::new(200) 237 .insert_header("atproto-repo-rev", "0") 238 .set_body_json(json!({ 239 "feed": [], 240 "cursor": null 241 })), 242 ) 243 .mount(mock_server) 244 .await; 245 Mock::given(method("GET")) 246 .and(path("/xrpc/app.bsky.feed.getAuthorFeed")) 247 .respond_with( 248 ResponseTemplate::new(200) 249 .insert_header("atproto-repo-rev", "0") 250 .set_body_json(json!({ 251 "feed": [{ 252 "post": { 253 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author", 254 "cid": "bafyappview123", 255 "author": {"did": "did:plc:mock-author", "handle": "mock.author"}, 256 "record": { 257 "$type": "app.bsky.feed.post", 258 "text": "Author feed post from appview", 259 "createdAt": "2025-01-01T00:00:00Z" 260 }, 261 "indexedAt": "2025-01-01T00:00:00Z" 262 } 263 }], 264 "cursor": "author-cursor" 265 })), 266 ) 267 .mount(mock_server) 268 .await; 269 Mock::given(method("GET")) 270 .and(path("/xrpc/app.bsky.feed.getActorLikes")) 271 .respond_with( 272 ResponseTemplate::new(200) 273 .insert_header("atproto-repo-rev", "0") 274 .set_body_json(json!({ 275 "feed": [{ 276 "post": { 277 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post", 278 "cid": "bafyliked123", 279 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"}, 280 "record": { 281 "$type": "app.bsky.feed.post", 282 "text": "Liked post from appview", 283 "createdAt": "2025-01-01T00:00:00Z" 284 }, 285 "indexedAt": "2025-01-01T00:00:00Z" 286 } 287 }], 288 "cursor": null 289 })), 290 ) 291 .mount(mock_server) 292 .await; 293 Mock::given(method("GET")) 294 .and(path("/xrpc/app.bsky.feed.getPostThread")) 295 .respond_with( 296 ResponseTemplate::new(200) 297 .insert_header("atproto-repo-rev", "0") 298 .set_body_json(json!({ 299 "thread": { 300 "$type": "app.bsky.feed.defs#threadViewPost", 301 "post": { 302 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post", 303 "cid": "bafythread123", 304 "author": {"did": "did:plc:mock", "handle": "mock.handle"}, 305 "record": { 306 "$type": "app.bsky.feed.post", 307 "text": "Thread post from appview", 308 "createdAt": "2025-01-01T00:00:00Z" 309 }, 310 "indexedAt": "2025-01-01T00:00:00Z" 311 }, 312 "replies": [] 313 } 314 })), 315 ) 316 .mount(mock_server) 317 .await; 318 Mock::given(method("GET")) 319 .and(path("/xrpc/app.bsky.feed.getFeed")) 320 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 321 "feed": [{ 322 "post": { 323 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post", 324 "cid": "bafyfeed123", 325 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"}, 326 "record": { 327 "$type": "app.bsky.feed.post", 328 "text": "Custom feed post from appview", 329 "createdAt": "2025-01-01T00:00:00Z" 330 }, 331 "indexedAt": "2025-01-01T00:00:00Z" 332 } 333 }], 334 "cursor": null 335 }))) 336 .mount(mock_server) 337 .await; 338 Mock::given(method("POST")) 339 .and(path("/xrpc/app.bsky.notification.registerPush")) 340 .respond_with(ResponseTemplate::new(200)) 341 .mount(mock_server) 342 .await; 343} 344 345async fn spawn_app(database_url: String) -> String { 346 use bspds::rate_limit::RateLimiters; 347 let pool = PgPoolOptions::new() 348 .max_connections(50) 349 .connect(&database_url) 350 .await 351 .expect("Failed to connect to Postgres. Make sure the database is running."); 352 sqlx::migrate!("./migrations") 353 .run(&pool) 354 .await 355 .expect("Failed to run migrations"); 356 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 357 let addr = listener.local_addr().unwrap(); 358 APP_PORT.set(addr.port()).ok(); 359 unsafe { 360 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 361 } 362 let rate_limiters = RateLimiters::new() 363 .with_login_limit(10000) 364 .with_account_creation_limit(10000) 365 .with_password_reset_limit(10000) 366 .with_email_update_limit(10000) 367 .with_oauth_authorize_limit(10000) 368 .with_oauth_token_limit(10000); 369 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters); 370 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 371 let app = bspds::app(state); 372 tokio::spawn(async move { 373 axum::serve(listener, app).await.unwrap(); 374 }); 375 format!("http://{}", addr) 376} 377 378#[allow(dead_code)] 379pub async fn get_db_connection_string() -> String { 380 base_url().await; 381 if has_external_infra() { 382 std::env::var("DATABASE_URL").expect("DATABASE_URL not set") 383 } else { 384 #[cfg(not(feature = "external-infra"))] 385 { 386 let container = DB_CONTAINER.get().expect("DB container not initialized"); 387 let port = container 388 .get_host_port_ipv4(5432) 389 .await 390 .expect("Failed to get port"); 391 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 392 } 393 #[cfg(feature = "external-infra")] 394 { 395 panic!("DATABASE_URL must be set with external-infra feature"); 396 } 397 } 398} 399 400#[allow(dead_code)] 401pub async fn verify_new_account(client: &Client, did: &str) -> String { 402 let conn_str = get_db_connection_string().await; 403 let pool = sqlx::postgres::PgPoolOptions::new() 404 .max_connections(2) 405 .connect(&conn_str) 406 .await 407 .expect("Failed to connect to test database"); 408 let verification_code: String = sqlx::query_scalar!( 409 "SELECT email_confirmation_code FROM users WHERE did = $1", 410 did 411 ) 412 .fetch_one(&pool) 413 .await 414 .expect("Failed to get verification code") 415 .expect("No verification code found"); 416 let confirm_payload = json!({ 417 "did": did, 418 "verificationCode": verification_code 419 }); 420 let confirm_res = client 421 .post(format!( 422 "{}/xrpc/com.atproto.server.confirmSignup", 423 base_url().await 424 )) 425 .json(&confirm_payload) 426 .send() 427 .await 428 .expect("confirmSignup request failed"); 429 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed"); 430 let confirm_body: Value = confirm_res 431 .json() 432 .await 433 .expect("Invalid JSON from confirmSignup"); 434 confirm_body["accessJwt"] 435 .as_str() 436 .expect("No accessJwt in confirmSignup response") 437 .to_string() 438} 439 440#[allow(dead_code)] 441pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 442 let res = client 443 .post(format!( 444 "{}/xrpc/com.atproto.repo.uploadBlob", 445 base_url().await 446 )) 447 .header(header::CONTENT_TYPE, mime) 448 .bearer_auth(AUTH_TOKEN) 449 .body(data) 450 .send() 451 .await 452 .expect("Failed to send uploadBlob request"); 453 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 454 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 455 body["blob"].clone() 456} 457 458#[allow(dead_code)] 459pub async fn create_test_post( 460 client: &Client, 461 text: &str, 462 reply_to: Option<Value>, 463) -> (String, String, String) { 464 let collection = "app.bsky.feed.post"; 465 let mut record = json!({ 466 "$type": collection, 467 "text": text, 468 "createdAt": Utc::now().to_rfc3339() 469 }); 470 if let Some(reply_obj) = reply_to { 471 record["reply"] = reply_obj; 472 } 473 let payload = json!({ 474 "repo": AUTH_DID, 475 "collection": collection, 476 "record": record 477 }); 478 let res = client 479 .post(format!( 480 "{}/xrpc/com.atproto.repo.createRecord", 481 base_url().await 482 )) 483 .bearer_auth(AUTH_TOKEN) 484 .json(&payload) 485 .send() 486 .await 487 .expect("Failed to send createRecord"); 488 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 489 let body: Value = res 490 .json() 491 .await 492 .expect("createRecord response was not JSON"); 493 let uri = body["uri"] 494 .as_str() 495 .expect("Response had no URI") 496 .to_string(); 497 let cid = body["cid"] 498 .as_str() 499 .expect("Response had no CID") 500 .to_string(); 501 let rkey = uri 502 .split('/') 503 .last() 504 .expect("URI was malformed") 505 .to_string(); 506 (uri, cid, rkey) 507} 508 509#[allow(dead_code)] 510pub async fn create_account_and_login(client: &Client) -> (String, String) { 511 let mut last_error = String::new(); 512 for attempt in 0..3 { 513 if attempt > 0 { 514 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 515 } 516 let handle = format!("user_{}", uuid::Uuid::new_v4()); 517 let payload = json!({ 518 "handle": handle, 519 "email": format!("{}@example.com", handle), 520 "password": "password" 521 }); 522 let res = match client 523 .post(format!( 524 "{}/xrpc/com.atproto.server.createAccount", 525 base_url().await 526 )) 527 .json(&payload) 528 .send() 529 .await 530 { 531 Ok(r) => r, 532 Err(e) => { 533 last_error = format!("Request failed: {}", e); 534 continue; 535 } 536 }; 537 if res.status() == StatusCode::OK { 538 let body: Value = res.json().await.expect("Invalid JSON"); 539 if let Some(access_jwt) = body["accessJwt"].as_str() { 540 let did = body["did"].as_str().expect("No did").to_string(); 541 return (access_jwt.to_string(), did); 542 } 543 let did = body["did"].as_str().expect("No did").to_string(); 544 let conn_str = get_db_connection_string().await; 545 let pool = sqlx::postgres::PgPoolOptions::new() 546 .max_connections(2) 547 .connect(&conn_str) 548 .await 549 .expect("Failed to connect to test database"); 550 let verification_code: String = sqlx::query_scalar!( 551 "SELECT email_confirmation_code FROM users WHERE did = $1", 552 &did 553 ) 554 .fetch_one(&pool) 555 .await 556 .expect("Failed to get verification code") 557 .expect("No verification code found"); 558 let confirm_payload = json!({ 559 "did": did, 560 "verificationCode": verification_code 561 }); 562 let confirm_res = client 563 .post(format!( 564 "{}/xrpc/com.atproto.server.confirmSignup", 565 base_url().await 566 )) 567 .json(&confirm_payload) 568 .send() 569 .await 570 .expect("confirmSignup request failed"); 571 if confirm_res.status() == StatusCode::OK { 572 let confirm_body: Value = confirm_res 573 .json() 574 .await 575 .expect("Invalid JSON from confirmSignup"); 576 let access_jwt = confirm_body["accessJwt"] 577 .as_str() 578 .expect("No accessJwt in confirmSignup response") 579 .to_string(); 580 return (access_jwt, did); 581 } 582 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await); 583 continue; 584 } 585 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 586 } 587 panic!("Failed to create account after 3 attempts: {}", last_error); 588}