this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use bspds::state::AppState; 5use chrono::Utc; 6use reqwest::{Client, StatusCode, header}; 7use serde_json::{Value, json}; 8use sqlx::postgres::PgPoolOptions; 9#[allow(unused_imports)] 10use std::collections::HashMap; 11use std::sync::OnceLock; 12#[allow(unused_imports)] 13use std::time::Duration; 14use tokio::net::TcpListener; 15use wiremock::matchers::{method, path}; 16use wiremock::{Mock, MockServer, ResponseTemplate}; 17 18static SERVER_URL: OnceLock<String> = OnceLock::new(); 19static APP_PORT: OnceLock<u16> = OnceLock::new(); 20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 21 22#[cfg(not(feature = "external-infra"))] 23use testcontainers::core::ContainerPort; 24#[cfg(not(feature = "external-infra"))] 25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 26#[cfg(not(feature = "external-infra"))] 27use testcontainers_modules::postgres::Postgres; 28#[cfg(not(feature = "external-infra"))] 29static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 30#[cfg(not(feature = "external-infra"))] 31static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 32 33#[allow(dead_code)] 34pub const AUTH_TOKEN: &str = "test-token"; 35#[allow(dead_code)] 36pub const BAD_AUTH_TOKEN: &str = "bad-token"; 37#[allow(dead_code)] 38pub const AUTH_DID: &str = "did:plc:fake"; 39#[allow(dead_code)] 40pub const TARGET_DID: &str = "did:plc:target"; 41 42fn has_external_infra() -> bool { 43 std::env::var("BSPDS_TEST_INFRA_READY").is_ok() 44 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok()) 45} 46#[cfg(test)] 47#[ctor::dtor] 48fn cleanup() { 49 if has_external_infra() { 50 return; 51 } 52 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 53 let _ = std::process::Command::new("podman") 54 .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 55 .output(); 56 } 57 let _ = std::process::Command::new("docker") 58 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"]) 59 .output(); 60} 61 62#[allow(dead_code)] 63pub fn client() -> Client { 64 Client::new() 65} 66 67#[allow(dead_code)] 68pub fn app_port() -> u16 { 69 *APP_PORT.get().expect("APP_PORT not initialized") 70} 71 72pub async fn base_url() -> &'static str { 73 SERVER_URL.get_or_init(|| { 74 let (tx, rx) = std::sync::mpsc::channel(); 75 std::thread::spawn(move || { 76 if std::env::var("DOCKER_HOST").is_err() { 77 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") { 78 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 79 if podman_sock.exists() { 80 unsafe { 81 std::env::set_var( 82 "DOCKER_HOST", 83 format!("unix://{}", podman_sock.display()), 84 ); 85 } 86 } 87 } 88 } 89 let rt = tokio::runtime::Runtime::new().unwrap(); 90 rt.block_on(async move { 91 if has_external_infra() { 92 let url = setup_with_external_infra().await; 93 tx.send(url).unwrap(); 94 } else { 95 let url = setup_with_testcontainers().await; 96 tx.send(url).unwrap(); 97 } 98 std::future::pending::<()>().await; 99 }); 100 }); 101 rx.recv().expect("Failed to start test server") 102 }) 103} 104 105async fn setup_with_external_infra() -> String { 106 let database_url = std::env::var("DATABASE_URL") 107 .expect("DATABASE_URL must be set when using external infra"); 108 let s3_endpoint = std::env::var("S3_ENDPOINT") 109 .expect("S3_ENDPOINT must be set when using external infra"); 110 unsafe { 111 std::env::set_var("S3_BUCKET", std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string())); 112 std::env::set_var("AWS_ACCESS_KEY_ID", std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string())); 113 std::env::set_var("AWS_SECRET_ACCESS_KEY", std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string())); 114 std::env::set_var("AWS_REGION", std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string())); 115 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 116 } 117 let mock_server = MockServer::start().await; 118 setup_mock_appview(&mock_server).await; 119 unsafe { 120 std::env::set_var("APPVIEW_URL", mock_server.uri()); 121 } 122 MOCK_APPVIEW.set(mock_server).ok(); 123 spawn_app(database_url).await 124} 125 126#[cfg(not(feature = "external-infra"))] 127async fn setup_with_testcontainers() -> String { 128 let s3_container = GenericImage::new("minio/minio", "latest") 129 .with_exposed_port(ContainerPort::Tcp(9000)) 130 .with_env_var("MINIO_ROOT_USER", "minioadmin") 131 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 132 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 133 .with_label("bspds_test", "true") 134 .start() 135 .await 136 .expect("Failed to start MinIO"); 137 let s3_port = s3_container 138 .get_host_port_ipv4(9000) 139 .await 140 .expect("Failed to get S3 port"); 141 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 142 unsafe { 143 std::env::set_var("S3_BUCKET", "test-bucket"); 144 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 145 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 146 std::env::set_var("AWS_REGION", "us-east-1"); 147 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 148 } 149 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 150 .region("us-east-1") 151 .endpoint_url(&s3_endpoint) 152 .credentials_provider(Credentials::new( 153 "minioadmin", 154 "minioadmin", 155 None, 156 None, 157 "test", 158 )) 159 .load() 160 .await; 161 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 162 .force_path_style(true) 163 .build(); 164 let s3_client = S3Client::from_conf(s3_config); 165 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 166 let mock_server = MockServer::start().await; 167 setup_mock_appview(&mock_server).await; 168 unsafe { 169 std::env::set_var("APPVIEW_URL", mock_server.uri()); 170 } 171 MOCK_APPVIEW.set(mock_server).ok(); 172 S3_CONTAINER.set(s3_container).ok(); 173 let container = Postgres::default() 174 .with_tag("18-alpine") 175 .with_label("bspds_test", "true") 176 .start() 177 .await 178 .expect("Failed to start Postgres"); 179 let connection_string = format!( 180 "postgres://postgres:postgres@127.0.0.1:{}", 181 container 182 .get_host_port_ipv4(5432) 183 .await 184 .expect("Failed to get port") 185 ); 186 DB_CONTAINER.set(container).ok(); 187 spawn_app(connection_string).await 188} 189 190#[cfg(feature = "external-infra")] 191async fn setup_with_testcontainers() -> String { 192 panic!("Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT."); 193} 194 195async fn setup_mock_appview(mock_server: &MockServer) { 196 Mock::given(method("GET")) 197 .and(path("/xrpc/app.bsky.actor.getProfile")) 198 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 199 "handle": "mock.handle", 200 "did": "did:plc:mock", 201 "displayName": "Mock User" 202 }))) 203 .mount(mock_server) 204 .await; 205 Mock::given(method("GET")) 206 .and(path("/xrpc/app.bsky.actor.searchActors")) 207 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 208 "actors": [], 209 "cursor": null 210 }))) 211 .mount(mock_server) 212 .await; 213 Mock::given(method("GET")) 214 .and(path("/xrpc/app.bsky.feed.getTimeline")) 215 .respond_with( 216 ResponseTemplate::new(200) 217 .insert_header("atproto-repo-rev", "0") 218 .set_body_json(json!({ 219 "feed": [], 220 "cursor": null 221 })) 222 ) 223 .mount(mock_server) 224 .await; 225 Mock::given(method("GET")) 226 .and(path("/xrpc/app.bsky.feed.getAuthorFeed")) 227 .respond_with( 228 ResponseTemplate::new(200) 229 .insert_header("atproto-repo-rev", "0") 230 .set_body_json(json!({ 231 "feed": [{ 232 "post": { 233 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author", 234 "cid": "bafyappview123", 235 "author": {"did": "did:plc:mock-author", "handle": "mock.author"}, 236 "record": { 237 "$type": "app.bsky.feed.post", 238 "text": "Author feed post from appview", 239 "createdAt": "2025-01-01T00:00:00Z" 240 }, 241 "indexedAt": "2025-01-01T00:00:00Z" 242 } 243 }], 244 "cursor": "author-cursor" 245 })), 246 ) 247 .mount(mock_server) 248 .await; 249 Mock::given(method("GET")) 250 .and(path("/xrpc/app.bsky.feed.getActorLikes")) 251 .respond_with( 252 ResponseTemplate::new(200) 253 .insert_header("atproto-repo-rev", "0") 254 .set_body_json(json!({ 255 "feed": [{ 256 "post": { 257 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post", 258 "cid": "bafyliked123", 259 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"}, 260 "record": { 261 "$type": "app.bsky.feed.post", 262 "text": "Liked post from appview", 263 "createdAt": "2025-01-01T00:00:00Z" 264 }, 265 "indexedAt": "2025-01-01T00:00:00Z" 266 } 267 }], 268 "cursor": null 269 })), 270 ) 271 .mount(mock_server) 272 .await; 273 Mock::given(method("GET")) 274 .and(path("/xrpc/app.bsky.feed.getPostThread")) 275 .respond_with( 276 ResponseTemplate::new(200) 277 .insert_header("atproto-repo-rev", "0") 278 .set_body_json(json!({ 279 "thread": { 280 "$type": "app.bsky.feed.defs#threadViewPost", 281 "post": { 282 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post", 283 "cid": "bafythread123", 284 "author": {"did": "did:plc:mock", "handle": "mock.handle"}, 285 "record": { 286 "$type": "app.bsky.feed.post", 287 "text": "Thread post from appview", 288 "createdAt": "2025-01-01T00:00:00Z" 289 }, 290 "indexedAt": "2025-01-01T00:00:00Z" 291 }, 292 "replies": [] 293 } 294 })), 295 ) 296 .mount(mock_server) 297 .await; 298 Mock::given(method("GET")) 299 .and(path("/xrpc/app.bsky.feed.getFeed")) 300 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 301 "feed": [{ 302 "post": { 303 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post", 304 "cid": "bafyfeed123", 305 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"}, 306 "record": { 307 "$type": "app.bsky.feed.post", 308 "text": "Custom feed post from appview", 309 "createdAt": "2025-01-01T00:00:00Z" 310 }, 311 "indexedAt": "2025-01-01T00:00:00Z" 312 } 313 }], 314 "cursor": null 315 }))) 316 .mount(mock_server) 317 .await; 318 Mock::given(method("POST")) 319 .and(path("/xrpc/app.bsky.notification.registerPush")) 320 .respond_with(ResponseTemplate::new(200)) 321 .mount(mock_server) 322 .await; 323} 324 325async fn spawn_app(database_url: String) -> String { 326 use bspds::rate_limit::RateLimiters; 327 let pool = PgPoolOptions::new() 328 .max_connections(50) 329 .connect(&database_url) 330 .await 331 .expect("Failed to connect to Postgres. Make sure the database is running."); 332 sqlx::migrate!("./migrations") 333 .run(&pool) 334 .await 335 .expect("Failed to run migrations"); 336 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 337 let addr = listener.local_addr().unwrap(); 338 APP_PORT.set(addr.port()).ok(); 339 unsafe { 340 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 341 } 342 let rate_limiters = RateLimiters::new() 343 .with_login_limit(10000) 344 .with_account_creation_limit(10000) 345 .with_password_reset_limit(10000) 346 .with_email_update_limit(10000) 347 .with_oauth_authorize_limit(10000) 348 .with_oauth_token_limit(10000); 349 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters); 350 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 351 let app = bspds::app(state); 352 tokio::spawn(async move { 353 axum::serve(listener, app).await.unwrap(); 354 }); 355 format!("http://{}", addr) 356} 357 358#[allow(dead_code)] 359pub async fn get_db_connection_string() -> String { 360 base_url().await; 361 if has_external_infra() { 362 std::env::var("DATABASE_URL").expect("DATABASE_URL not set") 363 } else { 364 #[cfg(not(feature = "external-infra"))] 365 { 366 let container = DB_CONTAINER.get().expect("DB container not initialized"); 367 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port"); 368 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 369 } 370 #[cfg(feature = "external-infra")] 371 { 372 panic!("DATABASE_URL must be set with external-infra feature"); 373 } 374 } 375} 376 377#[allow(dead_code)] 378pub async fn verify_new_account(client: &Client, did: &str) -> String { 379 let conn_str = get_db_connection_string().await; 380 let pool = sqlx::postgres::PgPoolOptions::new() 381 .max_connections(2) 382 .connect(&conn_str) 383 .await 384 .expect("Failed to connect to test database"); 385 let verification_code: String = sqlx::query_scalar!( 386 "SELECT email_confirmation_code FROM users WHERE did = $1", 387 did 388 ) 389 .fetch_one(&pool) 390 .await 391 .expect("Failed to get verification code") 392 .expect("No verification code found"); 393 let confirm_payload = json!({ 394 "did": did, 395 "verificationCode": verification_code 396 }); 397 let confirm_res = client 398 .post(format!( 399 "{}/xrpc/com.atproto.server.confirmSignup", 400 base_url().await 401 )) 402 .json(&confirm_payload) 403 .send() 404 .await 405 .expect("confirmSignup request failed"); 406 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed"); 407 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup"); 408 confirm_body["accessJwt"] 409 .as_str() 410 .expect("No accessJwt in confirmSignup response") 411 .to_string() 412} 413 414#[allow(dead_code)] 415pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 416 let res = client 417 .post(format!( 418 "{}/xrpc/com.atproto.repo.uploadBlob", 419 base_url().await 420 )) 421 .header(header::CONTENT_TYPE, mime) 422 .bearer_auth(AUTH_TOKEN) 423 .body(data) 424 .send() 425 .await 426 .expect("Failed to send uploadBlob request"); 427 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 428 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 429 body["blob"].clone() 430} 431 432#[allow(dead_code)] 433pub async fn create_test_post( 434 client: &Client, 435 text: &str, 436 reply_to: Option<Value>, 437) -> (String, String, String) { 438 let collection = "app.bsky.feed.post"; 439 let mut record = json!({ 440 "$type": collection, 441 "text": text, 442 "createdAt": Utc::now().to_rfc3339() 443 }); 444 if let Some(reply_obj) = reply_to { 445 record["reply"] = reply_obj; 446 } 447 let payload = json!({ 448 "repo": AUTH_DID, 449 "collection": collection, 450 "record": record 451 }); 452 let res = client 453 .post(format!( 454 "{}/xrpc/com.atproto.repo.createRecord", 455 base_url().await 456 )) 457 .bearer_auth(AUTH_TOKEN) 458 .json(&payload) 459 .send() 460 .await 461 .expect("Failed to send createRecord"); 462 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 463 let body: Value = res 464 .json() 465 .await 466 .expect("createRecord response was not JSON"); 467 let uri = body["uri"] 468 .as_str() 469 .expect("Response had no URI") 470 .to_string(); 471 let cid = body["cid"] 472 .as_str() 473 .expect("Response had no CID") 474 .to_string(); 475 let rkey = uri 476 .split('/') 477 .last() 478 .expect("URI was malformed") 479 .to_string(); 480 (uri, cid, rkey) 481} 482 483#[allow(dead_code)] 484pub async fn create_account_and_login(client: &Client) -> (String, String) { 485 let mut last_error = String::new(); 486 for attempt in 0..3 { 487 if attempt > 0 { 488 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 489 } 490 let handle = format!("user_{}", uuid::Uuid::new_v4()); 491 let payload = json!({ 492 "handle": handle, 493 "email": format!("{}@example.com", handle), 494 "password": "password" 495 }); 496 let res = match client 497 .post(format!( 498 "{}/xrpc/com.atproto.server.createAccount", 499 base_url().await 500 )) 501 .json(&payload) 502 .send() 503 .await 504 { 505 Ok(r) => r, 506 Err(e) => { 507 last_error = format!("Request failed: {}", e); 508 continue; 509 } 510 }; 511 if res.status() == StatusCode::OK { 512 let body: Value = res.json().await.expect("Invalid JSON"); 513 if let Some(access_jwt) = body["accessJwt"].as_str() { 514 let did = body["did"].as_str().expect("No did").to_string(); 515 return (access_jwt.to_string(), did); 516 } 517 let did = body["did"].as_str().expect("No did").to_string(); 518 let conn_str = get_db_connection_string().await; 519 let pool = sqlx::postgres::PgPoolOptions::new() 520 .max_connections(2) 521 .connect(&conn_str) 522 .await 523 .expect("Failed to connect to test database"); 524 let verification_code: String = sqlx::query_scalar!( 525 "SELECT email_confirmation_code FROM users WHERE did = $1", 526 &did 527 ) 528 .fetch_one(&pool) 529 .await 530 .expect("Failed to get verification code") 531 .expect("No verification code found"); 532 let confirm_payload = json!({ 533 "did": did, 534 "verificationCode": verification_code 535 }); 536 let confirm_res = client 537 .post(format!( 538 "{}/xrpc/com.atproto.server.confirmSignup", 539 base_url().await 540 )) 541 .json(&confirm_payload) 542 .send() 543 .await 544 .expect("confirmSignup request failed"); 545 if confirm_res.status() == StatusCode::OK { 546 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup"); 547 let access_jwt = confirm_body["accessJwt"] 548 .as_str() 549 .expect("No accessJwt in confirmSignup response") 550 .to_string(); 551 return (access_jwt, did); 552 } 553 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await); 554 continue; 555 } 556 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 557 } 558 panic!("Failed to create account after 3 attempts: {}", last_error); 559}