this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use bspds::state::AppState; 5use chrono::Utc; 6use reqwest::{Client, StatusCode, header}; 7use serde_json::{Value, json}; 8use sqlx::postgres::PgPoolOptions; 9#[allow(unused_imports)] 10use std::collections::HashMap; 11use std::sync::OnceLock; 12#[allow(unused_imports)] 13use std::time::Duration; 14use tokio::net::TcpListener; 15use wiremock::matchers::{method, path}; 16use wiremock::{Mock, MockServer, ResponseTemplate}; 17static SERVER_URL: OnceLock<String> = OnceLock::new(); 18static APP_PORT: OnceLock<u16> = OnceLock::new(); 19static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 20#[cfg(not(feature = "external-infra"))] 21use testcontainers::core::ContainerPort; 22#[cfg(not(feature = "external-infra"))] 23use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 24#[cfg(not(feature = "external-infra"))] 25use testcontainers_modules::postgres::Postgres; 26#[cfg(not(feature = "external-infra"))] 27static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 28#[cfg(not(feature = "external-infra"))] 29static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 30#[allow(dead_code)] 31pub const AUTH_TOKEN: &str = "test-token"; 32#[allow(dead_code)] 33pub const BAD_AUTH_TOKEN: &str = "bad-token"; 34#[allow(dead_code)] 35pub const AUTH_DID: &str = "did:plc:fake"; 36#[allow(dead_code)] 37pub const TARGET_DID: &str = "did:plc:target"; 38fn has_external_infra() -> bool { 39 std::env::var("BSPDS_TEST_INFRA_READY").is_ok() 40 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok()) 41} 42#[cfg(test)] 43#[ctor::dtor] 44fn cleanup() { 45 if has_external_infra() { 46 return; 47 } 48 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 49 let _ = std::process::Command::new("podman") 50 .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 51 .output(); 52 } 53 let _ = std::process::Command::new("docker") 54 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"]) 55 .output(); 56} 57#[allow(dead_code)] 58pub fn client() -> Client { 59 Client::new() 60} 61#[allow(dead_code)] 62pub fn app_port() -> u16 { 63 *APP_PORT.get().expect("APP_PORT not initialized") 64} 65pub async fn base_url() -> &'static str { 66 SERVER_URL.get_or_init(|| { 67 let (tx, rx) = std::sync::mpsc::channel(); 68 std::thread::spawn(move || { 69 if std::env::var("DOCKER_HOST").is_err() { 70 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") { 71 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 72 if podman_sock.exists() { 73 unsafe { 74 std::env::set_var( 75 "DOCKER_HOST", 76 format!("unix://{}", podman_sock.display()), 77 ); 78 } 79 } 80 } 81 } 82 let rt = tokio::runtime::Runtime::new().unwrap(); 83 rt.block_on(async move { 84 if has_external_infra() { 85 let url = setup_with_external_infra().await; 86 tx.send(url).unwrap(); 87 } else { 88 let url = setup_with_testcontainers().await; 89 tx.send(url).unwrap(); 90 } 91 std::future::pending::<()>().await; 92 }); 93 }); 94 rx.recv().expect("Failed to start test server") 95 }) 96} 97async fn setup_with_external_infra() -> String { 98 let database_url = std::env::var("DATABASE_URL") 99 .expect("DATABASE_URL must be set when using external infra"); 100 let s3_endpoint = std::env::var("S3_ENDPOINT") 101 .expect("S3_ENDPOINT must be set when using external infra"); 102 unsafe { 103 std::env::set_var("S3_BUCKET", std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string())); 104 std::env::set_var("AWS_ACCESS_KEY_ID", std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string())); 105 std::env::set_var("AWS_SECRET_ACCESS_KEY", std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string())); 106 std::env::set_var("AWS_REGION", std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string())); 107 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 108 } 109 let mock_server = MockServer::start().await; 110 setup_mock_appview(&mock_server).await; 111 unsafe { 112 std::env::set_var("APPVIEW_URL", mock_server.uri()); 113 } 114 MOCK_APPVIEW.set(mock_server).ok(); 115 spawn_app(database_url).await 116} 117#[cfg(not(feature = "external-infra"))] 118async fn setup_with_testcontainers() -> String { 119 let s3_container = GenericImage::new("minio/minio", "latest") 120 .with_exposed_port(ContainerPort::Tcp(9000)) 121 .with_env_var("MINIO_ROOT_USER", "minioadmin") 122 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 123 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 124 .with_label("bspds_test", "true") 125 .start() 126 .await 127 .expect("Failed to start MinIO"); 128 let s3_port = s3_container 129 .get_host_port_ipv4(9000) 130 .await 131 .expect("Failed to get S3 port"); 132 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 133 unsafe { 134 std::env::set_var("S3_BUCKET", "test-bucket"); 135 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 136 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 137 std::env::set_var("AWS_REGION", "us-east-1"); 138 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 139 } 140 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 141 .region("us-east-1") 142 .endpoint_url(&s3_endpoint) 143 .credentials_provider(Credentials::new( 144 "minioadmin", 145 "minioadmin", 146 None, 147 None, 148 "test", 149 )) 150 .load() 151 .await; 152 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 153 .force_path_style(true) 154 .build(); 155 let s3_client = S3Client::from_conf(s3_config); 156 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 157 let mock_server = MockServer::start().await; 158 setup_mock_appview(&mock_server).await; 159 unsafe { 160 std::env::set_var("APPVIEW_URL", mock_server.uri()); 161 } 162 MOCK_APPVIEW.set(mock_server).ok(); 163 S3_CONTAINER.set(s3_container).ok(); 164 let container = Postgres::default() 165 .with_tag("18-alpine") 166 .with_label("bspds_test", "true") 167 .start() 168 .await 169 .expect("Failed to start Postgres"); 170 let connection_string = format!( 171 "postgres://postgres:postgres@127.0.0.1:{}", 172 container 173 .get_host_port_ipv4(5432) 174 .await 175 .expect("Failed to get port") 176 ); 177 DB_CONTAINER.set(container).ok(); 178 spawn_app(connection_string).await 179} 180#[cfg(feature = "external-infra")] 181async fn setup_with_testcontainers() -> String { 182 panic!("Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT."); 183} 184async fn setup_mock_appview(mock_server: &MockServer) { 185 Mock::given(method("GET")) 186 .and(path("/xrpc/app.bsky.actor.getProfile")) 187 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 188 "handle": "mock.handle", 189 "did": "did:plc:mock", 190 "displayName": "Mock User" 191 }))) 192 .mount(mock_server) 193 .await; 194 Mock::given(method("GET")) 195 .and(path("/xrpc/app.bsky.actor.searchActors")) 196 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 197 "actors": [], 198 "cursor": null 199 }))) 200 .mount(mock_server) 201 .await; 202 Mock::given(method("GET")) 203 .and(path("/xrpc/app.bsky.feed.getTimeline")) 204 .respond_with( 205 ResponseTemplate::new(200) 206 .insert_header("atproto-repo-rev", "0") 207 .set_body_json(json!({ 208 "feed": [], 209 "cursor": null 210 })) 211 ) 212 .mount(mock_server) 213 .await; 214 Mock::given(method("GET")) 215 .and(path("/xrpc/app.bsky.feed.getAuthorFeed")) 216 .respond_with( 217 ResponseTemplate::new(200) 218 .insert_header("atproto-repo-rev", "0") 219 .set_body_json(json!({ 220 "feed": [{ 221 "post": { 222 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author", 223 "cid": "bafyappview123", 224 "author": {"did": "did:plc:mock-author", "handle": "mock.author"}, 225 "record": { 226 "$type": "app.bsky.feed.post", 227 "text": "Author feed post from appview", 228 "createdAt": "2025-01-01T00:00:00Z" 229 }, 230 "indexedAt": "2025-01-01T00:00:00Z" 231 } 232 }], 233 "cursor": "author-cursor" 234 })), 235 ) 236 .mount(mock_server) 237 .await; 238 Mock::given(method("GET")) 239 .and(path("/xrpc/app.bsky.feed.getActorLikes")) 240 .respond_with( 241 ResponseTemplate::new(200) 242 .insert_header("atproto-repo-rev", "0") 243 .set_body_json(json!({ 244 "feed": [{ 245 "post": { 246 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post", 247 "cid": "bafyliked123", 248 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"}, 249 "record": { 250 "$type": "app.bsky.feed.post", 251 "text": "Liked post from appview", 252 "createdAt": "2025-01-01T00:00:00Z" 253 }, 254 "indexedAt": "2025-01-01T00:00:00Z" 255 } 256 }], 257 "cursor": null 258 })), 259 ) 260 .mount(mock_server) 261 .await; 262 Mock::given(method("GET")) 263 .and(path("/xrpc/app.bsky.feed.getPostThread")) 264 .respond_with( 265 ResponseTemplate::new(200) 266 .insert_header("atproto-repo-rev", "0") 267 .set_body_json(json!({ 268 "thread": { 269 "$type": "app.bsky.feed.defs#threadViewPost", 270 "post": { 271 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post", 272 "cid": "bafythread123", 273 "author": {"did": "did:plc:mock", "handle": "mock.handle"}, 274 "record": { 275 "$type": "app.bsky.feed.post", 276 "text": "Thread post from appview", 277 "createdAt": "2025-01-01T00:00:00Z" 278 }, 279 "indexedAt": "2025-01-01T00:00:00Z" 280 }, 281 "replies": [] 282 } 283 })), 284 ) 285 .mount(mock_server) 286 .await; 287 Mock::given(method("GET")) 288 .and(path("/xrpc/app.bsky.feed.getFeed")) 289 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 290 "feed": [{ 291 "post": { 292 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post", 293 "cid": "bafyfeed123", 294 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"}, 295 "record": { 296 "$type": "app.bsky.feed.post", 297 "text": "Custom feed post from appview", 298 "createdAt": "2025-01-01T00:00:00Z" 299 }, 300 "indexedAt": "2025-01-01T00:00:00Z" 301 } 302 }], 303 "cursor": null 304 }))) 305 .mount(mock_server) 306 .await; 307 Mock::given(method("POST")) 308 .and(path("/xrpc/app.bsky.notification.registerPush")) 309 .respond_with(ResponseTemplate::new(200)) 310 .mount(mock_server) 311 .await; 312} 313async fn spawn_app(database_url: String) -> String { 314 use bspds::rate_limit::RateLimiters; 315 let pool = PgPoolOptions::new() 316 .max_connections(50) 317 .connect(&database_url) 318 .await 319 .expect("Failed to connect to Postgres. Make sure the database is running."); 320 sqlx::migrate!("./migrations") 321 .run(&pool) 322 .await 323 .expect("Failed to run migrations"); 324 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 325 let addr = listener.local_addr().unwrap(); 326 APP_PORT.set(addr.port()).ok(); 327 unsafe { 328 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 329 } 330 let rate_limiters = RateLimiters::new() 331 .with_login_limit(10000) 332 .with_account_creation_limit(10000) 333 .with_password_reset_limit(10000) 334 .with_email_update_limit(10000) 335 .with_oauth_authorize_limit(10000) 336 .with_oauth_token_limit(10000); 337 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters); 338 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 339 let app = bspds::app(state); 340 tokio::spawn(async move { 341 axum::serve(listener, app).await.unwrap(); 342 }); 343 format!("http://{}", addr) 344} 345#[allow(dead_code)] 346pub async fn get_db_connection_string() -> String { 347 base_url().await; 348 if has_external_infra() { 349 std::env::var("DATABASE_URL").expect("DATABASE_URL not set") 350 } else { 351 #[cfg(not(feature = "external-infra"))] 352 { 353 let container = DB_CONTAINER.get().expect("DB container not initialized"); 354 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port"); 355 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 356 } 357 #[cfg(feature = "external-infra")] 358 { 359 panic!("DATABASE_URL must be set with external-infra feature"); 360 } 361 } 362} 363#[allow(dead_code)] 364pub async fn verify_new_account(client: &Client, did: &str) -> String { 365 let conn_str = get_db_connection_string().await; 366 let pool = sqlx::postgres::PgPoolOptions::new() 367 .max_connections(2) 368 .connect(&conn_str) 369 .await 370 .expect("Failed to connect to test database"); 371 let verification_code: String = sqlx::query_scalar!( 372 "SELECT email_confirmation_code FROM users WHERE did = $1", 373 did 374 ) 375 .fetch_one(&pool) 376 .await 377 .expect("Failed to get verification code") 378 .expect("No verification code found"); 379 let confirm_payload = json!({ 380 "did": did, 381 "verificationCode": verification_code 382 }); 383 let confirm_res = client 384 .post(format!( 385 "{}/xrpc/com.atproto.server.confirmSignup", 386 base_url().await 387 )) 388 .json(&confirm_payload) 389 .send() 390 .await 391 .expect("confirmSignup request failed"); 392 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed"); 393 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup"); 394 confirm_body["accessJwt"] 395 .as_str() 396 .expect("No accessJwt in confirmSignup response") 397 .to_string() 398} 399#[allow(dead_code)] 400pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 401 let res = client 402 .post(format!( 403 "{}/xrpc/com.atproto.repo.uploadBlob", 404 base_url().await 405 )) 406 .header(header::CONTENT_TYPE, mime) 407 .bearer_auth(AUTH_TOKEN) 408 .body(data) 409 .send() 410 .await 411 .expect("Failed to send uploadBlob request"); 412 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 413 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 414 body["blob"].clone() 415} 416#[allow(dead_code)] 417pub async fn create_test_post( 418 client: &Client, 419 text: &str, 420 reply_to: Option<Value>, 421) -> (String, String, String) { 422 let collection = "app.bsky.feed.post"; 423 let mut record = json!({ 424 "$type": collection, 425 "text": text, 426 "createdAt": Utc::now().to_rfc3339() 427 }); 428 if let Some(reply_obj) = reply_to { 429 record["reply"] = reply_obj; 430 } 431 let payload = json!({ 432 "repo": AUTH_DID, 433 "collection": collection, 434 "record": record 435 }); 436 let res = client 437 .post(format!( 438 "{}/xrpc/com.atproto.repo.createRecord", 439 base_url().await 440 )) 441 .bearer_auth(AUTH_TOKEN) 442 .json(&payload) 443 .send() 444 .await 445 .expect("Failed to send createRecord"); 446 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 447 let body: Value = res 448 .json() 449 .await 450 .expect("createRecord response was not JSON"); 451 let uri = body["uri"] 452 .as_str() 453 .expect("Response had no URI") 454 .to_string(); 455 let cid = body["cid"] 456 .as_str() 457 .expect("Response had no CID") 458 .to_string(); 459 let rkey = uri 460 .split('/') 461 .last() 462 .expect("URI was malformed") 463 .to_string(); 464 (uri, cid, rkey) 465} 466#[allow(dead_code)] 467pub async fn create_account_and_login(client: &Client) -> (String, String) { 468 let mut last_error = String::new(); 469 for attempt in 0..3 { 470 if attempt > 0 { 471 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 472 } 473 let handle = format!("user_{}", uuid::Uuid::new_v4()); 474 let payload = json!({ 475 "handle": handle, 476 "email": format!("{}@example.com", handle), 477 "password": "password" 478 }); 479 let res = match client 480 .post(format!( 481 "{}/xrpc/com.atproto.server.createAccount", 482 base_url().await 483 )) 484 .json(&payload) 485 .send() 486 .await 487 { 488 Ok(r) => r, 489 Err(e) => { 490 last_error = format!("Request failed: {}", e); 491 continue; 492 } 493 }; 494 if res.status() == StatusCode::OK { 495 let body: Value = res.json().await.expect("Invalid JSON"); 496 if let Some(access_jwt) = body["accessJwt"].as_str() { 497 let did = body["did"].as_str().expect("No did").to_string(); 498 return (access_jwt.to_string(), did); 499 } 500 let did = body["did"].as_str().expect("No did").to_string(); 501 let conn_str = get_db_connection_string().await; 502 let pool = sqlx::postgres::PgPoolOptions::new() 503 .max_connections(2) 504 .connect(&conn_str) 505 .await 506 .expect("Failed to connect to test database"); 507 let verification_code: String = sqlx::query_scalar!( 508 "SELECT email_confirmation_code FROM users WHERE did = $1", 509 &did 510 ) 511 .fetch_one(&pool) 512 .await 513 .expect("Failed to get verification code") 514 .expect("No verification code found"); 515 let confirm_payload = json!({ 516 "did": did, 517 "verificationCode": verification_code 518 }); 519 let confirm_res = client 520 .post(format!( 521 "{}/xrpc/com.atproto.server.confirmSignup", 522 base_url().await 523 )) 524 .json(&confirm_payload) 525 .send() 526 .await 527 .expect("confirmSignup request failed"); 528 if confirm_res.status() == StatusCode::OK { 529 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup"); 530 let access_jwt = confirm_body["accessJwt"] 531 .as_str() 532 .expect("No accessJwt in confirmSignup response") 533 .to_string(); 534 return (access_jwt, did); 535 } 536 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await); 537 continue; 538 } 539 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 540 } 541 panic!("Failed to create account after 3 attempts: {}", last_error); 542}