this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use bspds::state::AppState; 5use chrono::Utc; 6use reqwest::{Client, StatusCode, header}; 7use serde_json::{Value, json}; 8use sqlx::postgres::PgPoolOptions; 9#[allow(unused_imports)] 10use std::collections::HashMap; 11use std::sync::OnceLock; 12#[allow(unused_imports)] 13use std::time::Duration; 14use testcontainers::core::ContainerPort; 15use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 16use testcontainers_modules::postgres::Postgres; 17use tokio::net::TcpListener; 18use wiremock::matchers::{method, path}; 19use wiremock::{Mock, MockServer, ResponseTemplate}; 20 21static SERVER_URL: OnceLock<String> = OnceLock::new(); 22static APP_PORT: OnceLock<u16> = OnceLock::new(); 23static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 24static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 25static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 26 27#[allow(dead_code)] 28pub const AUTH_TOKEN: &str = "test-token"; 29#[allow(dead_code)] 30pub const BAD_AUTH_TOKEN: &str = "bad-token"; 31#[allow(dead_code)] 32pub const AUTH_DID: &str = "did:plc:fake"; 33#[allow(dead_code)] 34pub const TARGET_DID: &str = "did:plc:target"; 35 36#[cfg(test)] 37#[ctor::dtor] 38fn cleanup() { 39 // my attempt to force clean up containers created by this test binary. 40 // this is a fallback in case ryuk fails or is not supported 41 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 42 let _ = std::process::Command::new("podman") 43 .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 44 .output(); 45 } 46 47 let _ = std::process::Command::new("docker") 48 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"]) 49 .output(); 50} 51 52#[allow(dead_code)] 53pub fn client() -> Client { 54 Client::new() 55} 56 57#[allow(dead_code)] 58pub fn app_port() -> u16 { 59 *APP_PORT.get().expect("APP_PORT not initialized") 60} 61 62pub async fn base_url() -> &'static str { 63 SERVER_URL.get_or_init(|| { 64 let (tx, rx) = std::sync::mpsc::channel(); 65 66 std::thread::spawn(move || { 67 if std::env::var("DOCKER_HOST").is_err() { 68 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") { 69 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 70 if podman_sock.exists() { 71 unsafe { 72 std::env::set_var( 73 "DOCKER_HOST", 74 format!("unix://{}", podman_sock.display()), 75 ); 76 } 77 } 78 } 79 } 80 81 let rt = tokio::runtime::Runtime::new().unwrap(); 82 rt.block_on(async move { 83 let s3_container = GenericImage::new("minio/minio", "latest") 84 .with_exposed_port(ContainerPort::Tcp(9000)) 85 .with_env_var("MINIO_ROOT_USER", "minioadmin") 86 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 87 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 88 .with_label("bspds_test", "true") 89 .start() 90 .await 91 .expect("Failed to start MinIO"); 92 93 let s3_port = s3_container 94 .get_host_port_ipv4(9000) 95 .await 96 .expect("Failed to get S3 port"); 97 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 98 99 unsafe { 100 std::env::set_var("S3_BUCKET", "test-bucket"); 101 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 102 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 103 std::env::set_var("AWS_REGION", "us-east-1"); 104 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 105 } 106 107 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 108 .region("us-east-1") 109 .endpoint_url(&s3_endpoint) 110 .credentials_provider(Credentials::new( 111 "minioadmin", 112 "minioadmin", 113 None, 114 None, 115 "test", 116 )) 117 .load() 118 .await; 119 120 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 121 .force_path_style(true) 122 .build(); 123 let s3_client = S3Client::from_conf(s3_config); 124 125 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 126 127 let mock_server = MockServer::start().await; 128 129 Mock::given(method("GET")) 130 .and(path("/xrpc/app.bsky.actor.getProfile")) 131 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 132 "handle": "mock.handle", 133 "did": "did:plc:mock", 134 "displayName": "Mock User" 135 }))) 136 .mount(&mock_server) 137 .await; 138 139 Mock::given(method("GET")) 140 .and(path("/xrpc/app.bsky.actor.searchActors")) 141 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 142 "actors": [], 143 "cursor": null 144 }))) 145 .mount(&mock_server) 146 .await; 147 148 unsafe { 149 std::env::set_var("APPVIEW_URL", mock_server.uri()); 150 } 151 MOCK_APPVIEW.set(mock_server).ok(); 152 153 S3_CONTAINER.set(s3_container).ok(); 154 155 let container = Postgres::default() 156 .with_tag("18-alpine") 157 .with_label("bspds_test", "true") 158 .start() 159 .await 160 .expect("Failed to start Postgres"); 161 let connection_string = format!( 162 "postgres://postgres:postgres@127.0.0.1:{}", 163 container 164 .get_host_port_ipv4(5432) 165 .await 166 .expect("Failed to get port") 167 ); 168 169 DB_CONTAINER.set(container).ok(); 170 171 let url = spawn_app(connection_string).await; 172 tx.send(url).unwrap(); 173 std::future::pending::<()>().await; 174 }); 175 }); 176 177 rx.recv().expect("Failed to start test server") 178 }) 179} 180 181async fn spawn_app(database_url: String) -> String { 182 let pool = PgPoolOptions::new() 183 .max_connections(50) 184 .connect(&database_url) 185 .await 186 .expect("Failed to connect to Postgres. Make sure the database is running."); 187 188 sqlx::migrate!("./migrations") 189 .run(&pool) 190 .await 191 .expect("Failed to run migrations"); 192 193 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 194 let addr = listener.local_addr().unwrap(); 195 APP_PORT.set(addr.port()).ok(); 196 197 unsafe { 198 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 199 } 200 201 let state = AppState::new(pool).await; 202 203 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 204 205 let app = bspds::app(state); 206 207 tokio::spawn(async move { 208 axum::serve(listener, app).await.unwrap(); 209 }); 210 211 format!("http://{}", addr) 212} 213 214#[allow(dead_code)] 215pub async fn get_db_connection_string() -> String { 216 base_url().await; 217 let container = DB_CONTAINER.get().expect("DB container not initialized"); 218 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port"); 219 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 220} 221 222#[allow(dead_code)] 223pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 224 let res = client 225 .post(format!( 226 "{}/xrpc/com.atproto.repo.uploadBlob", 227 base_url().await 228 )) 229 .header(header::CONTENT_TYPE, mime) 230 .bearer_auth(AUTH_TOKEN) 231 .body(data) 232 .send() 233 .await 234 .expect("Failed to send uploadBlob request"); 235 236 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 237 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 238 body["blob"].clone() 239} 240 241#[allow(dead_code)] 242pub async fn create_test_post( 243 client: &Client, 244 text: &str, 245 reply_to: Option<Value>, 246) -> (String, String, String) { 247 let collection = "app.bsky.feed.post"; 248 let mut record = json!({ 249 "$type": collection, 250 "text": text, 251 "createdAt": Utc::now().to_rfc3339() 252 }); 253 254 if let Some(reply_obj) = reply_to { 255 record["reply"] = reply_obj; 256 } 257 258 let payload = json!({ 259 "repo": AUTH_DID, 260 "collection": collection, 261 "record": record 262 }); 263 264 let res = client 265 .post(format!( 266 "{}/xrpc/com.atproto.repo.createRecord", 267 base_url().await 268 )) 269 .bearer_auth(AUTH_TOKEN) 270 .json(&payload) 271 .send() 272 .await 273 .expect("Failed to send createRecord"); 274 275 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 276 let body: Value = res 277 .json() 278 .await 279 .expect("createRecord response was not JSON"); 280 281 let uri = body["uri"] 282 .as_str() 283 .expect("Response had no URI") 284 .to_string(); 285 let cid = body["cid"] 286 .as_str() 287 .expect("Response had no CID") 288 .to_string(); 289 let rkey = uri 290 .split('/') 291 .last() 292 .expect("URI was malformed") 293 .to_string(); 294 295 (uri, cid, rkey) 296} 297 298#[allow(dead_code)] 299pub async fn create_account_and_login(client: &Client) -> (String, String) { 300 let mut last_error = String::new(); 301 302 for attempt in 0..3 { 303 if attempt > 0 { 304 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 305 } 306 307 let handle = format!("user_{}", uuid::Uuid::new_v4()); 308 let payload = json!({ 309 "handle": handle, 310 "email": format!("{}@example.com", handle), 311 "password": "password" 312 }); 313 314 let res = match client 315 .post(format!( 316 "{}/xrpc/com.atproto.server.createAccount", 317 base_url().await 318 )) 319 .json(&payload) 320 .send() 321 .await 322 { 323 Ok(r) => r, 324 Err(e) => { 325 last_error = format!("Request failed: {}", e); 326 continue; 327 } 328 }; 329 330 if res.status() == StatusCode::OK { 331 let body: Value = res.json().await.expect("Invalid JSON"); 332 let access_jwt = body["accessJwt"] 333 .as_str() 334 .expect("No accessJwt") 335 .to_string(); 336 let did = body["did"].as_str().expect("No did").to_string(); 337 return (access_jwt, did); 338 } 339 340 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 341 } 342 343 panic!("Failed to create account after 3 attempts: {}", last_error); 344}