this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use bspds::state::AppState; 5use chrono::Utc; 6use reqwest::{Client, StatusCode, header}; 7use serde_json::{Value, json}; 8use sqlx::postgres::PgPoolOptions; 9#[allow(unused_imports)] 10use std::collections::HashMap; 11use std::sync::OnceLock; 12#[allow(unused_imports)] 13use std::time::Duration; 14use tokio::net::TcpListener; 15use wiremock::matchers::{method, path}; 16use wiremock::{Mock, MockServer, ResponseTemplate}; 17 18static SERVER_URL: OnceLock<String> = OnceLock::new(); 19static APP_PORT: OnceLock<u16> = OnceLock::new(); 20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 21 22#[cfg(not(feature = "external-infra"))] 23use testcontainers::core::ContainerPort; 24#[cfg(not(feature = "external-infra"))] 25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 26#[cfg(not(feature = "external-infra"))] 27use testcontainers_modules::postgres::Postgres; 28#[cfg(not(feature = "external-infra"))] 29static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 30#[cfg(not(feature = "external-infra"))] 31static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 32 33#[allow(dead_code)] 34pub const AUTH_TOKEN: &str = "test-token"; 35#[allow(dead_code)] 36pub const BAD_AUTH_TOKEN: &str = "bad-token"; 37#[allow(dead_code)] 38pub const AUTH_DID: &str = "did:plc:fake"; 39#[allow(dead_code)] 40pub const TARGET_DID: &str = "did:plc:target"; 41 42fn has_external_infra() -> bool { 43 std::env::var("BSPDS_TEST_INFRA_READY").is_ok() 44 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok()) 45} 46#[cfg(test)] 47#[ctor::dtor] 48fn cleanup() { 49 if has_external_infra() { 50 return; 51 } 52 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 53 let _ = std::process::Command::new("podman") 54 .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 55 .output(); 56 } 57 let _ = std::process::Command::new("docker") 58 .args(&[ 59 "container", 60 "prune", 61 "-f", 62 "--filter", 63 "label=bspds_test=true", 64 ]) 65 .output(); 66} 67 68#[allow(dead_code)] 69pub fn client() -> Client { 70 Client::new() 71} 72 73#[allow(dead_code)] 74pub fn app_port() -> u16 { 75 *APP_PORT.get().expect("APP_PORT not initialized") 76} 77 78pub async fn base_url() -> &'static str { 79 SERVER_URL.get_or_init(|| { 80 let (tx, rx) = std::sync::mpsc::channel(); 81 std::thread::spawn(move || { 82 unsafe { 83 std::env::set_var("BSPDS_ALLOW_INSECURE_SECRETS", "1"); 84 } 85 if std::env::var("DOCKER_HOST").is_err() { 86 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") { 87 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 88 if podman_sock.exists() { 89 unsafe { 90 std::env::set_var( 91 "DOCKER_HOST", 92 format!("unix://{}", podman_sock.display()), 93 ); 94 } 95 } 96 } 97 } 98 let rt = tokio::runtime::Runtime::new().unwrap(); 99 rt.block_on(async move { 100 if has_external_infra() { 101 let url = setup_with_external_infra().await; 102 tx.send(url).unwrap(); 103 } else { 104 let url = setup_with_testcontainers().await; 105 tx.send(url).unwrap(); 106 } 107 std::future::pending::<()>().await; 108 }); 109 }); 110 rx.recv().expect("Failed to start test server") 111 }) 112} 113 114async fn setup_with_external_infra() -> String { 115 let database_url = 116 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra"); 117 let s3_endpoint = 118 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra"); 119 unsafe { 120 std::env::set_var( 121 "S3_BUCKET", 122 std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()), 123 ); 124 std::env::set_var( 125 "AWS_ACCESS_KEY_ID", 126 std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()), 127 ); 128 std::env::set_var( 129 "AWS_SECRET_ACCESS_KEY", 130 std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()), 131 ); 132 std::env::set_var( 133 "AWS_REGION", 134 std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()), 135 ); 136 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 137 } 138 let mock_server = MockServer::start().await; 139 setup_mock_appview(&mock_server).await; 140 unsafe { 141 std::env::set_var("APPVIEW_URL", mock_server.uri()); 142 } 143 MOCK_APPVIEW.set(mock_server).ok(); 144 spawn_app(database_url).await 145} 146 147#[cfg(not(feature = "external-infra"))] 148async fn setup_with_testcontainers() -> String { 149 let s3_container = GenericImage::new("minio/minio", "latest") 150 .with_exposed_port(ContainerPort::Tcp(9000)) 151 .with_env_var("MINIO_ROOT_USER", "minioadmin") 152 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 153 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 154 .with_label("bspds_test", "true") 155 .start() 156 .await 157 .expect("Failed to start MinIO"); 158 let s3_port = s3_container 159 .get_host_port_ipv4(9000) 160 .await 161 .expect("Failed to get S3 port"); 162 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 163 unsafe { 164 std::env::set_var("S3_BUCKET", "test-bucket"); 165 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 166 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 167 std::env::set_var("AWS_REGION", "us-east-1"); 168 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 169 } 170 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 171 .region("us-east-1") 172 .endpoint_url(&s3_endpoint) 173 .credentials_provider(Credentials::new( 174 "minioadmin", 175 "minioadmin", 176 None, 177 None, 178 "test", 179 )) 180 .load() 181 .await; 182 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 183 .force_path_style(true) 184 .build(); 185 let s3_client = S3Client::from_conf(s3_config); 186 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 187 let mock_server = MockServer::start().await; 188 setup_mock_appview(&mock_server).await; 189 unsafe { 190 std::env::set_var("APPVIEW_URL", mock_server.uri()); 191 } 192 MOCK_APPVIEW.set(mock_server).ok(); 193 S3_CONTAINER.set(s3_container).ok(); 194 let container = Postgres::default() 195 .with_tag("18-alpine") 196 .with_label("bspds_test", "true") 197 .start() 198 .await 199 .expect("Failed to start Postgres"); 200 let connection_string = format!( 201 "postgres://postgres:postgres@127.0.0.1:{}", 202 container 203 .get_host_port_ipv4(5432) 204 .await 205 .expect("Failed to get port") 206 ); 207 DB_CONTAINER.set(container).ok(); 208 spawn_app(connection_string).await 209} 210 211#[cfg(feature = "external-infra")] 212async fn setup_with_testcontainers() -> String { 213 panic!( 214 "Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT." 215 ); 216} 217 218async fn setup_mock_appview(mock_server: &MockServer) { 219 Mock::given(method("GET")) 220 .and(path("/xrpc/app.bsky.actor.getProfile")) 221 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 222 "handle": "mock.handle", 223 "did": "did:plc:mock", 224 "displayName": "Mock User" 225 }))) 226 .mount(mock_server) 227 .await; 228 Mock::given(method("GET")) 229 .and(path("/xrpc/app.bsky.actor.searchActors")) 230 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 231 "actors": [], 232 "cursor": null 233 }))) 234 .mount(mock_server) 235 .await; 236 Mock::given(method("GET")) 237 .and(path("/xrpc/app.bsky.feed.getTimeline")) 238 .respond_with( 239 ResponseTemplate::new(200) 240 .insert_header("atproto-repo-rev", "0") 241 .set_body_json(json!({ 242 "feed": [], 243 "cursor": null 244 })), 245 ) 246 .mount(mock_server) 247 .await; 248 Mock::given(method("GET")) 249 .and(path("/xrpc/app.bsky.feed.getAuthorFeed")) 250 .respond_with( 251 ResponseTemplate::new(200) 252 .insert_header("atproto-repo-rev", "0") 253 .set_body_json(json!({ 254 "feed": [{ 255 "post": { 256 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author", 257 "cid": "bafyappview123", 258 "author": {"did": "did:plc:mock-author", "handle": "mock.author"}, 259 "record": { 260 "$type": "app.bsky.feed.post", 261 "text": "Author feed post from appview", 262 "createdAt": "2025-01-01T00:00:00Z" 263 }, 264 "indexedAt": "2025-01-01T00:00:00Z" 265 } 266 }], 267 "cursor": "author-cursor" 268 })), 269 ) 270 .mount(mock_server) 271 .await; 272 Mock::given(method("GET")) 273 .and(path("/xrpc/app.bsky.feed.getActorLikes")) 274 .respond_with( 275 ResponseTemplate::new(200) 276 .insert_header("atproto-repo-rev", "0") 277 .set_body_json(json!({ 278 "feed": [{ 279 "post": { 280 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post", 281 "cid": "bafyliked123", 282 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"}, 283 "record": { 284 "$type": "app.bsky.feed.post", 285 "text": "Liked post from appview", 286 "createdAt": "2025-01-01T00:00:00Z" 287 }, 288 "indexedAt": "2025-01-01T00:00:00Z" 289 } 290 }], 291 "cursor": null 292 })), 293 ) 294 .mount(mock_server) 295 .await; 296 Mock::given(method("GET")) 297 .and(path("/xrpc/app.bsky.feed.getPostThread")) 298 .respond_with( 299 ResponseTemplate::new(200) 300 .insert_header("atproto-repo-rev", "0") 301 .set_body_json(json!({ 302 "thread": { 303 "$type": "app.bsky.feed.defs#threadViewPost", 304 "post": { 305 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post", 306 "cid": "bafythread123", 307 "author": {"did": "did:plc:mock", "handle": "mock.handle"}, 308 "record": { 309 "$type": "app.bsky.feed.post", 310 "text": "Thread post from appview", 311 "createdAt": "2025-01-01T00:00:00Z" 312 }, 313 "indexedAt": "2025-01-01T00:00:00Z" 314 }, 315 "replies": [] 316 } 317 })), 318 ) 319 .mount(mock_server) 320 .await; 321 Mock::given(method("GET")) 322 .and(path("/xrpc/app.bsky.feed.getFeed")) 323 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 324 "feed": [{ 325 "post": { 326 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post", 327 "cid": "bafyfeed123", 328 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"}, 329 "record": { 330 "$type": "app.bsky.feed.post", 331 "text": "Custom feed post from appview", 332 "createdAt": "2025-01-01T00:00:00Z" 333 }, 334 "indexedAt": "2025-01-01T00:00:00Z" 335 } 336 }], 337 "cursor": null 338 }))) 339 .mount(mock_server) 340 .await; 341 Mock::given(method("POST")) 342 .and(path("/xrpc/app.bsky.notification.registerPush")) 343 .respond_with(ResponseTemplate::new(200)) 344 .mount(mock_server) 345 .await; 346} 347 348async fn spawn_app(database_url: String) -> String { 349 use bspds::rate_limit::RateLimiters; 350 let pool = PgPoolOptions::new() 351 .max_connections(50) 352 .connect(&database_url) 353 .await 354 .expect("Failed to connect to Postgres. Make sure the database is running."); 355 sqlx::migrate!("./migrations") 356 .run(&pool) 357 .await 358 .expect("Failed to run migrations"); 359 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 360 let addr = listener.local_addr().unwrap(); 361 APP_PORT.set(addr.port()).ok(); 362 unsafe { 363 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 364 } 365 let rate_limiters = RateLimiters::new() 366 .with_login_limit(10000) 367 .with_account_creation_limit(10000) 368 .with_password_reset_limit(10000) 369 .with_email_update_limit(10000) 370 .with_oauth_authorize_limit(10000) 371 .with_oauth_token_limit(10000); 372 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters); 373 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 374 let app = bspds::app(state); 375 tokio::spawn(async move { 376 axum::serve(listener, app).await.unwrap(); 377 }); 378 format!("http://{}", addr) 379} 380 381#[allow(dead_code)] 382pub async fn get_db_connection_string() -> String { 383 base_url().await; 384 if has_external_infra() { 385 std::env::var("DATABASE_URL").expect("DATABASE_URL not set") 386 } else { 387 #[cfg(not(feature = "external-infra"))] 388 { 389 let container = DB_CONTAINER.get().expect("DB container not initialized"); 390 let port = container 391 .get_host_port_ipv4(5432) 392 .await 393 .expect("Failed to get port"); 394 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 395 } 396 #[cfg(feature = "external-infra")] 397 { 398 panic!("DATABASE_URL must be set with external-infra feature"); 399 } 400 } 401} 402 403#[allow(dead_code)] 404pub async fn verify_new_account(client: &Client, did: &str) -> String { 405 let conn_str = get_db_connection_string().await; 406 let pool = sqlx::postgres::PgPoolOptions::new() 407 .max_connections(2) 408 .connect(&conn_str) 409 .await 410 .expect("Failed to connect to test database"); 411 let verification_code: String = sqlx::query_scalar!( 412 "SELECT code FROM channel_verifications WHERE user_id = (SELECT id FROM users WHERE did = $1) AND channel = 'email'", 413 did 414 ) 415 .fetch_one(&pool) 416 .await 417 .expect("Failed to get verification code"); 418 419 let confirm_payload = json!({ 420 "did": did, 421 "verificationCode": verification_code 422 }); 423 let confirm_res = client 424 .post(format!( 425 "{}/xrpc/com.atproto.server.confirmSignup", 426 base_url().await 427 )) 428 .json(&confirm_payload) 429 .send() 430 .await 431 .expect("confirmSignup request failed"); 432 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed"); 433 let confirm_body: Value = confirm_res 434 .json() 435 .await 436 .expect("Invalid JSON from confirmSignup"); 437 confirm_body["accessJwt"] 438 .as_str() 439 .expect("No accessJwt in confirmSignup response") 440 .to_string() 441} 442 443#[allow(dead_code)] 444pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 445 let res = client 446 .post(format!( 447 "{}/xrpc/com.atproto.repo.uploadBlob", 448 base_url().await 449 )) 450 .header(header::CONTENT_TYPE, mime) 451 .bearer_auth(AUTH_TOKEN) 452 .body(data) 453 .send() 454 .await 455 .expect("Failed to send uploadBlob request"); 456 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 457 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 458 body["blob"].clone() 459} 460 461#[allow(dead_code)] 462pub async fn create_test_post( 463 client: &Client, 464 text: &str, 465 reply_to: Option<Value>, 466) -> (String, String, String) { 467 let collection = "app.bsky.feed.post"; 468 let mut record = json!({ 469 "$type": collection, 470 "text": text, 471 "createdAt": Utc::now().to_rfc3339() 472 }); 473 if let Some(reply_obj) = reply_to { 474 record["reply"] = reply_obj; 475 } 476 let payload = json!({ 477 "repo": AUTH_DID, 478 "collection": collection, 479 "record": record 480 }); 481 let res = client 482 .post(format!( 483 "{}/xrpc/com.atproto.repo.createRecord", 484 base_url().await 485 )) 486 .bearer_auth(AUTH_TOKEN) 487 .json(&payload) 488 .send() 489 .await 490 .expect("Failed to send createRecord"); 491 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 492 let body: Value = res 493 .json() 494 .await 495 .expect("createRecord response was not JSON"); 496 let uri = body["uri"] 497 .as_str() 498 .expect("Response had no URI") 499 .to_string(); 500 let cid = body["cid"] 501 .as_str() 502 .expect("Response had no CID") 503 .to_string(); 504 let rkey = uri 505 .split('/') 506 .last() 507 .expect("URI was malformed") 508 .to_string(); 509 (uri, cid, rkey) 510} 511 512#[allow(dead_code)] 513pub async fn create_account_and_login(client: &Client) -> (String, String) { 514 let mut last_error = String::new(); 515 for attempt in 0..3 { 516 if attempt > 0 { 517 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 518 } 519 let handle = format!("user_{}", uuid::Uuid::new_v4()); 520 let payload = json!({ 521 "handle": handle, 522 "email": format!("{}@example.com", handle), 523 "password": "password" 524 }); 525 let res = match client 526 .post(format!( 527 "{}/xrpc/com.atproto.server.createAccount", 528 base_url().await 529 )) 530 .json(&payload) 531 .send() 532 .await 533 { 534 Ok(r) => r, 535 Err(e) => { 536 last_error = format!("Request failed: {}", e); 537 continue; 538 } 539 }; 540 if res.status() == StatusCode::OK { 541 let body: Value = res.json().await.expect("Invalid JSON"); 542 if let Some(access_jwt) = body["accessJwt"].as_str() { 543 let did = body["did"].as_str().expect("No did").to_string(); 544 return (access_jwt.to_string(), did); 545 } 546 let did = body["did"].as_str().expect("No did").to_string(); 547 let conn_str = get_db_connection_string().await; 548 let pool = sqlx::postgres::PgPoolOptions::new() 549 .max_connections(2) 550 .connect(&conn_str) 551 .await 552 .expect("Failed to connect to test database"); 553 let verification_code: String = sqlx::query_scalar!( 554 "SELECT code FROM channel_verifications WHERE user_id = (SELECT id FROM users WHERE did = $1) AND channel = 'email'", 555 &did 556 ) 557 .fetch_one(&pool) 558 .await 559 .expect("Failed to get verification code"); 560 561 let confirm_payload = json!({ 562 "did": did, 563 "verificationCode": verification_code 564 }); 565 let confirm_res = client 566 .post(format!( 567 "{}/xrpc/com.atproto.server.confirmSignup", 568 base_url().await 569 )) 570 .json(&confirm_payload) 571 .send() 572 .await 573 .expect("confirmSignup request failed"); 574 if confirm_res.status() == StatusCode::OK { 575 let confirm_body: Value = confirm_res 576 .json() 577 .await 578 .expect("Invalid JSON from confirmSignup"); 579 let access_jwt = confirm_body["accessJwt"] 580 .as_str() 581 .expect("No accessJwt in confirmSignup response") 582 .to_string(); 583 return (access_jwt, did); 584 } 585 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await); 586 continue; 587 } 588 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 589 } 590 panic!("Failed to create account after 3 attempts: {}", last_error); 591}