this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use bspds::state::AppState; 5use chrono::Utc; 6use reqwest::{Client, StatusCode, header}; 7use serde_json::{Value, json}; 8use sqlx::postgres::PgPoolOptions; 9#[allow(unused_imports)] 10use std::collections::HashMap; 11use std::sync::OnceLock; 12#[allow(unused_imports)] 13use std::time::Duration; 14use tokio::net::TcpListener; 15use wiremock::matchers::{method, path}; 16use wiremock::{Mock, MockServer, ResponseTemplate}; 17 18static SERVER_URL: OnceLock<String> = OnceLock::new(); 19static APP_PORT: OnceLock<u16> = OnceLock::new(); 20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 21 22#[cfg(not(feature = "external-infra"))] 23use testcontainers::core::ContainerPort; 24#[cfg(not(feature = "external-infra"))] 25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 26#[cfg(not(feature = "external-infra"))] 27use testcontainers_modules::postgres::Postgres; 28#[cfg(not(feature = "external-infra"))] 29static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 30#[cfg(not(feature = "external-infra"))] 31static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 32 33#[allow(dead_code)] 34pub const AUTH_TOKEN: &str = "test-token"; 35#[allow(dead_code)] 36pub const BAD_AUTH_TOKEN: &str = "bad-token"; 37#[allow(dead_code)] 38pub const AUTH_DID: &str = "did:plc:fake"; 39#[allow(dead_code)] 40pub const TARGET_DID: &str = "did:plc:target"; 41 42fn has_external_infra() -> bool { 43 std::env::var("BSPDS_TEST_INFRA_READY").is_ok() 44 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok()) 45} 46#[cfg(test)] 47#[ctor::dtor] 48fn cleanup() { 49 if has_external_infra() { 50 return; 51 } 52 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 53 let _ = std::process::Command::new("podman") 54 .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 55 .output(); 56 } 57 let _ = std::process::Command::new("docker") 58 .args(&[ 59 "container", 60 "prune", 61 "-f", 62 "--filter", 63 "label=bspds_test=true", 64 ]) 65 .output(); 66} 67 68#[allow(dead_code)] 69pub fn client() -> Client { 70 Client::new() 71} 72 73#[allow(dead_code)] 74pub fn app_port() -> u16 { 75 *APP_PORT.get().expect("APP_PORT not initialized") 76} 77 78pub async fn base_url() -> &'static str { 79 SERVER_URL.get_or_init(|| { 80 let (tx, rx) = std::sync::mpsc::channel(); 81 std::thread::spawn(move || { 82 unsafe { 83 std::env::set_var("BSPDS_ALLOW_INSECURE_SECRETS", "1"); 84 } 85 if std::env::var("DOCKER_HOST").is_err() { 86 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") { 87 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 88 if podman_sock.exists() { 89 unsafe { 90 std::env::set_var( 91 "DOCKER_HOST", 92 format!("unix://{}", podman_sock.display()), 93 ); 94 } 95 } 96 } 97 } 98 let rt = tokio::runtime::Runtime::new().unwrap(); 99 rt.block_on(async move { 100 if has_external_infra() { 101 let url = setup_with_external_infra().await; 102 tx.send(url).unwrap(); 103 } else { 104 let url = setup_with_testcontainers().await; 105 tx.send(url).unwrap(); 106 } 107 std::future::pending::<()>().await; 108 }); 109 }); 110 rx.recv().expect("Failed to start test server") 111 }) 112} 113 114async fn setup_with_external_infra() -> String { 115 let database_url = 116 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra"); 117 let s3_endpoint = 118 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra"); 119 unsafe { 120 std::env::set_var( 121 "S3_BUCKET", 122 std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()), 123 ); 124 std::env::set_var( 125 "AWS_ACCESS_KEY_ID", 126 std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()), 127 ); 128 std::env::set_var( 129 "AWS_SECRET_ACCESS_KEY", 130 std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()), 131 ); 132 std::env::set_var( 133 "AWS_REGION", 134 std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()), 135 ); 136 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 137 } 138 let mock_server = MockServer::start().await; 139 setup_mock_appview(&mock_server).await; 140 let mock_uri = mock_server.uri(); 141 let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri); 142 let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A")); 143 setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await; 144 unsafe { 145 std::env::set_var("APPVIEW_DID_APP_BSKY", &mock_did); 146 } 147 MOCK_APPVIEW.set(mock_server).ok(); 148 spawn_app(database_url).await 149} 150 151#[cfg(not(feature = "external-infra"))] 152async fn setup_with_testcontainers() -> String { 153 let s3_container = GenericImage::new("minio/minio", "latest") 154 .with_exposed_port(ContainerPort::Tcp(9000)) 155 .with_env_var("MINIO_ROOT_USER", "minioadmin") 156 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 157 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 158 .with_label("bspds_test", "true") 159 .start() 160 .await 161 .expect("Failed to start MinIO"); 162 let s3_port = s3_container 163 .get_host_port_ipv4(9000) 164 .await 165 .expect("Failed to get S3 port"); 166 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 167 unsafe { 168 std::env::set_var("S3_BUCKET", "test-bucket"); 169 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 170 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 171 std::env::set_var("AWS_REGION", "us-east-1"); 172 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 173 } 174 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 175 .region("us-east-1") 176 .endpoint_url(&s3_endpoint) 177 .credentials_provider(Credentials::new( 178 "minioadmin", 179 "minioadmin", 180 None, 181 None, 182 "test", 183 )) 184 .load() 185 .await; 186 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 187 .force_path_style(true) 188 .build(); 189 let s3_client = S3Client::from_conf(s3_config); 190 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 191 let mock_server = MockServer::start().await; 192 setup_mock_appview(&mock_server).await; 193 let mock_uri = mock_server.uri(); 194 let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri); 195 let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A")); 196 setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await; 197 unsafe { 198 std::env::set_var("APPVIEW_DID_APP_BSKY", &mock_did); 199 } 200 MOCK_APPVIEW.set(mock_server).ok(); 201 S3_CONTAINER.set(s3_container).ok(); 202 let container = Postgres::default() 203 .with_tag("18-alpine") 204 .with_label("bspds_test", "true") 205 .start() 206 .await 207 .expect("Failed to start Postgres"); 208 let connection_string = format!( 209 "postgres://postgres:postgres@127.0.0.1:{}", 210 container 211 .get_host_port_ipv4(5432) 212 .await 213 .expect("Failed to get port") 214 ); 215 DB_CONTAINER.set(container).ok(); 216 spawn_app(connection_string).await 217} 218 219#[cfg(feature = "external-infra")] 220async fn setup_with_testcontainers() -> String { 221 panic!( 222 "Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT." 223 ); 224} 225 226async fn setup_mock_did_document(mock_server: &MockServer, did: &str, service_endpoint: &str) { 227 Mock::given(method("GET")) 228 .and(path("/.well-known/did.json")) 229 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 230 "id": did, 231 "service": [{ 232 "id": "#atproto_appview", 233 "type": "AtprotoAppView", 234 "serviceEndpoint": service_endpoint 235 }] 236 }))) 237 .mount(mock_server) 238 .await; 239} 240 241async fn setup_mock_appview(mock_server: &MockServer) { 242 Mock::given(method("GET")) 243 .and(path("/xrpc/app.bsky.actor.getProfile")) 244 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 245 "handle": "mock.handle", 246 "did": "did:plc:mock", 247 "displayName": "Mock User" 248 }))) 249 .mount(mock_server) 250 .await; 251 Mock::given(method("GET")) 252 .and(path("/xrpc/app.bsky.actor.searchActors")) 253 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 254 "actors": [], 255 "cursor": null 256 }))) 257 .mount(mock_server) 258 .await; 259 Mock::given(method("GET")) 260 .and(path("/xrpc/app.bsky.feed.getTimeline")) 261 .respond_with( 262 ResponseTemplate::new(200) 263 .insert_header("atproto-repo-rev", "0") 264 .set_body_json(json!({ 265 "feed": [], 266 "cursor": null 267 })), 268 ) 269 .mount(mock_server) 270 .await; 271 Mock::given(method("GET")) 272 .and(path("/xrpc/app.bsky.feed.getAuthorFeed")) 273 .respond_with( 274 ResponseTemplate::new(200) 275 .insert_header("atproto-repo-rev", "0") 276 .set_body_json(json!({ 277 "feed": [{ 278 "post": { 279 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author", 280 "cid": "bafyappview123", 281 "author": {"did": "did:plc:mock-author", "handle": "mock.author"}, 282 "record": { 283 "$type": "app.bsky.feed.post", 284 "text": "Author feed post from appview", 285 "createdAt": "2025-01-01T00:00:00Z" 286 }, 287 "indexedAt": "2025-01-01T00:00:00Z" 288 } 289 }], 290 "cursor": "author-cursor" 291 })), 292 ) 293 .mount(mock_server) 294 .await; 295 Mock::given(method("GET")) 296 .and(path("/xrpc/app.bsky.feed.getActorLikes")) 297 .respond_with( 298 ResponseTemplate::new(200) 299 .insert_header("atproto-repo-rev", "0") 300 .set_body_json(json!({ 301 "feed": [{ 302 "post": { 303 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post", 304 "cid": "bafyliked123", 305 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"}, 306 "record": { 307 "$type": "app.bsky.feed.post", 308 "text": "Liked post from appview", 309 "createdAt": "2025-01-01T00:00:00Z" 310 }, 311 "indexedAt": "2025-01-01T00:00:00Z" 312 } 313 }], 314 "cursor": null 315 })), 316 ) 317 .mount(mock_server) 318 .await; 319 Mock::given(method("GET")) 320 .and(path("/xrpc/app.bsky.feed.getPostThread")) 321 .respond_with( 322 ResponseTemplate::new(200) 323 .insert_header("atproto-repo-rev", "0") 324 .set_body_json(json!({ 325 "thread": { 326 "$type": "app.bsky.feed.defs#threadViewPost", 327 "post": { 328 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post", 329 "cid": "bafythread123", 330 "author": {"did": "did:plc:mock", "handle": "mock.handle"}, 331 "record": { 332 "$type": "app.bsky.feed.post", 333 "text": "Thread post from appview", 334 "createdAt": "2025-01-01T00:00:00Z" 335 }, 336 "indexedAt": "2025-01-01T00:00:00Z" 337 }, 338 "replies": [] 339 } 340 })), 341 ) 342 .mount(mock_server) 343 .await; 344 Mock::given(method("GET")) 345 .and(path("/xrpc/app.bsky.feed.getFeed")) 346 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 347 "feed": [{ 348 "post": { 349 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post", 350 "cid": "bafyfeed123", 351 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"}, 352 "record": { 353 "$type": "app.bsky.feed.post", 354 "text": "Custom feed post from appview", 355 "createdAt": "2025-01-01T00:00:00Z" 356 }, 357 "indexedAt": "2025-01-01T00:00:00Z" 358 } 359 }], 360 "cursor": null 361 }))) 362 .mount(mock_server) 363 .await; 364 Mock::given(method("POST")) 365 .and(path("/xrpc/app.bsky.notification.registerPush")) 366 .respond_with(ResponseTemplate::new(200)) 367 .mount(mock_server) 368 .await; 369} 370 371async fn spawn_app(database_url: String) -> String { 372 use bspds::rate_limit::RateLimiters; 373 let pool = PgPoolOptions::new() 374 .max_connections(50) 375 .connect(&database_url) 376 .await 377 .expect("Failed to connect to Postgres. Make sure the database is running."); 378 sqlx::migrate!("./migrations") 379 .run(&pool) 380 .await 381 .expect("Failed to run migrations"); 382 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 383 let addr = listener.local_addr().unwrap(); 384 APP_PORT.set(addr.port()).ok(); 385 unsafe { 386 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 387 } 388 let rate_limiters = RateLimiters::new() 389 .with_login_limit(10000) 390 .with_account_creation_limit(10000) 391 .with_password_reset_limit(10000) 392 .with_email_update_limit(10000) 393 .with_oauth_authorize_limit(10000) 394 .with_oauth_token_limit(10000); 395 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters); 396 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 397 let app = bspds::app(state); 398 tokio::spawn(async move { 399 axum::serve(listener, app).await.unwrap(); 400 }); 401 format!("http://{}", addr) 402} 403 404#[allow(dead_code)] 405pub async fn get_db_connection_string() -> String { 406 base_url().await; 407 if has_external_infra() { 408 std::env::var("DATABASE_URL").expect("DATABASE_URL not set") 409 } else { 410 #[cfg(not(feature = "external-infra"))] 411 { 412 let container = DB_CONTAINER.get().expect("DB container not initialized"); 413 let port = container 414 .get_host_port_ipv4(5432) 415 .await 416 .expect("Failed to get port"); 417 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 418 } 419 #[cfg(feature = "external-infra")] 420 { 421 panic!("DATABASE_URL must be set with external-infra feature"); 422 } 423 } 424} 425 426#[allow(dead_code)] 427pub async fn verify_new_account(client: &Client, did: &str) -> String { 428 let conn_str = get_db_connection_string().await; 429 let pool = sqlx::postgres::PgPoolOptions::new() 430 .max_connections(2) 431 .connect(&conn_str) 432 .await 433 .expect("Failed to connect to test database"); 434 let verification_code: String = sqlx::query_scalar!( 435 "SELECT code FROM channel_verifications WHERE user_id = (SELECT id FROM users WHERE did = $1) AND channel = 'email'", 436 did 437 ) 438 .fetch_one(&pool) 439 .await 440 .expect("Failed to get verification code"); 441 442 let confirm_payload = json!({ 443 "did": did, 444 "verificationCode": verification_code 445 }); 446 let confirm_res = client 447 .post(format!( 448 "{}/xrpc/com.atproto.server.confirmSignup", 449 base_url().await 450 )) 451 .json(&confirm_payload) 452 .send() 453 .await 454 .expect("confirmSignup request failed"); 455 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed"); 456 let confirm_body: Value = confirm_res 457 .json() 458 .await 459 .expect("Invalid JSON from confirmSignup"); 460 confirm_body["accessJwt"] 461 .as_str() 462 .expect("No accessJwt in confirmSignup response") 463 .to_string() 464} 465 466#[allow(dead_code)] 467pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 468 let res = client 469 .post(format!( 470 "{}/xrpc/com.atproto.repo.uploadBlob", 471 base_url().await 472 )) 473 .header(header::CONTENT_TYPE, mime) 474 .bearer_auth(AUTH_TOKEN) 475 .body(data) 476 .send() 477 .await 478 .expect("Failed to send uploadBlob request"); 479 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 480 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 481 body["blob"].clone() 482} 483 484#[allow(dead_code)] 485pub async fn create_test_post( 486 client: &Client, 487 text: &str, 488 reply_to: Option<Value>, 489) -> (String, String, String) { 490 let collection = "app.bsky.feed.post"; 491 let mut record = json!({ 492 "$type": collection, 493 "text": text, 494 "createdAt": Utc::now().to_rfc3339() 495 }); 496 if let Some(reply_obj) = reply_to { 497 record["reply"] = reply_obj; 498 } 499 let payload = json!({ 500 "repo": AUTH_DID, 501 "collection": collection, 502 "record": record 503 }); 504 let res = client 505 .post(format!( 506 "{}/xrpc/com.atproto.repo.createRecord", 507 base_url().await 508 )) 509 .bearer_auth(AUTH_TOKEN) 510 .json(&payload) 511 .send() 512 .await 513 .expect("Failed to send createRecord"); 514 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 515 let body: Value = res 516 .json() 517 .await 518 .expect("createRecord response was not JSON"); 519 let uri = body["uri"] 520 .as_str() 521 .expect("Response had no URI") 522 .to_string(); 523 let cid = body["cid"] 524 .as_str() 525 .expect("Response had no CID") 526 .to_string(); 527 let rkey = uri 528 .split('/') 529 .last() 530 .expect("URI was malformed") 531 .to_string(); 532 (uri, cid, rkey) 533} 534 535#[allow(dead_code)] 536pub async fn create_account_and_login(client: &Client) -> (String, String) { 537 create_account_and_login_internal(client, false).await 538} 539 540#[allow(dead_code)] 541pub async fn create_admin_account_and_login(client: &Client) -> (String, String) { 542 create_account_and_login_internal(client, true).await 543} 544 545async fn create_account_and_login_internal(client: &Client, make_admin: bool) -> (String, String) { 546 let mut last_error = String::new(); 547 for attempt in 0..3 { 548 if attempt > 0 { 549 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 550 } 551 let handle = format!("user_{}", uuid::Uuid::new_v4()); 552 let payload = json!({ 553 "handle": handle, 554 "email": format!("{}@example.com", handle), 555 "password": "password" 556 }); 557 let res = match client 558 .post(format!( 559 "{}/xrpc/com.atproto.server.createAccount", 560 base_url().await 561 )) 562 .json(&payload) 563 .send() 564 .await 565 { 566 Ok(r) => r, 567 Err(e) => { 568 last_error = format!("Request failed: {}", e); 569 continue; 570 } 571 }; 572 if res.status() == StatusCode::OK { 573 let body: Value = res.json().await.expect("Invalid JSON"); 574 let did = body["did"].as_str().expect("No did").to_string(); 575 let conn_str = get_db_connection_string().await; 576 let pool = sqlx::postgres::PgPoolOptions::new() 577 .max_connections(2) 578 .connect(&conn_str) 579 .await 580 .expect("Failed to connect to test database"); 581 if make_admin { 582 sqlx::query!("UPDATE users SET is_admin = TRUE WHERE did = $1", &did) 583 .execute(&pool) 584 .await 585 .expect("Failed to mark user as admin"); 586 } 587 if let Some(access_jwt) = body["accessJwt"].as_str() { 588 return (access_jwt.to_string(), did); 589 } 590 let verification_code: String = sqlx::query_scalar!( 591 "SELECT code FROM channel_verifications WHERE user_id = (SELECT id FROM users WHERE did = $1) AND channel = 'email'", 592 &did 593 ) 594 .fetch_one(&pool) 595 .await 596 .expect("Failed to get verification code"); 597 598 let confirm_payload = json!({ 599 "did": did, 600 "verificationCode": verification_code 601 }); 602 let confirm_res = client 603 .post(format!( 604 "{}/xrpc/com.atproto.server.confirmSignup", 605 base_url().await 606 )) 607 .json(&confirm_payload) 608 .send() 609 .await 610 .expect("confirmSignup request failed"); 611 if confirm_res.status() == StatusCode::OK { 612 let confirm_body: Value = confirm_res 613 .json() 614 .await 615 .expect("Invalid JSON from confirmSignup"); 616 let access_jwt = confirm_body["accessJwt"] 617 .as_str() 618 .expect("No accessJwt in confirmSignup response") 619 .to_string(); 620 return (access_jwt, did); 621 } 622 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await); 623 continue; 624 } 625 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 626 } 627 panic!("Failed to create account after 3 attempts: {}", last_error); 628}