this repo has no description
1use aws_config::BehaviorVersion; 2use aws_sdk_s3::Client as S3Client; 3use aws_sdk_s3::config::Credentials; 4use bspds::state::AppState; 5use chrono::Utc; 6use reqwest::{Client, StatusCode, header}; 7use serde_json::{Value, json}; 8use sqlx::postgres::PgPoolOptions; 9#[allow(unused_imports)] 10use std::collections::HashMap; 11use std::sync::OnceLock; 12#[allow(unused_imports)] 13use std::time::Duration; 14use testcontainers::core::ContainerPort; 15use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner}; 16use testcontainers_modules::postgres::Postgres; 17use tokio::net::TcpListener; 18use wiremock::matchers::{method, path}; 19use wiremock::{Mock, MockServer, ResponseTemplate}; 20 21static SERVER_URL: OnceLock<String> = OnceLock::new(); 22static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 23static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 24static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 25 26#[allow(dead_code)] 27pub const AUTH_TOKEN: &str = "test-token"; 28#[allow(dead_code)] 29pub const BAD_AUTH_TOKEN: &str = "bad-token"; 30#[allow(dead_code)] 31pub const AUTH_DID: &str = "did:plc:fake"; 32#[allow(dead_code)] 33pub const TARGET_DID: &str = "did:plc:target"; 34 35#[cfg(test)] 36#[ctor::dtor] 37fn cleanup() { 38 // my attempt to force clean up containers created by this test binary. 39 // this is a fallback in case ryuk fails or is not supported 40 if std::env::var("XDG_RUNTIME_DIR").is_ok() { 41 let _ = std::process::Command::new("podman") 42 .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 43 .output(); 44 } 45 46 let _ = std::process::Command::new("docker") 47 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"]) 48 .output(); 49} 50 51#[allow(dead_code)] 52pub fn client() -> Client { 53 Client::new() 54} 55 56pub async fn base_url() -> &'static str { 57 SERVER_URL.get_or_init(|| { 58 let (tx, rx) = std::sync::mpsc::channel(); 59 60 std::thread::spawn(move || { 61 if std::env::var("DOCKER_HOST").is_err() { 62 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") { 63 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock"); 64 if podman_sock.exists() { 65 unsafe { 66 std::env::set_var( 67 "DOCKER_HOST", 68 format!("unix://{}", podman_sock.display()), 69 ); 70 } 71 } 72 } 73 } 74 75 let rt = tokio::runtime::Runtime::new().unwrap(); 76 rt.block_on(async move { 77 let s3_container = GenericImage::new("minio/minio", "latest") 78 .with_exposed_port(ContainerPort::Tcp(9000)) 79 .with_env_var("MINIO_ROOT_USER", "minioadmin") 80 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 81 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 82 .with_label("bspds_test", "true") 83 .start() 84 .await 85 .expect("Failed to start MinIO"); 86 87 let s3_port = s3_container 88 .get_host_port_ipv4(9000) 89 .await 90 .expect("Failed to get S3 port"); 91 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 92 93 unsafe { 94 std::env::set_var("S3_BUCKET", "test-bucket"); 95 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); 96 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); 97 std::env::set_var("AWS_REGION", "us-east-1"); 98 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 99 } 100 101 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 102 .region("us-east-1") 103 .endpoint_url(&s3_endpoint) 104 .credentials_provider(Credentials::new( 105 "minioadmin", 106 "minioadmin", 107 None, 108 None, 109 "test", 110 )) 111 .load() 112 .await; 113 114 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) 115 .force_path_style(true) 116 .build(); 117 let s3_client = S3Client::from_conf(s3_config); 118 119 let _ = s3_client.create_bucket().bucket("test-bucket").send().await; 120 121 let mock_server = MockServer::start().await; 122 123 Mock::given(method("GET")) 124 .and(path("/xrpc/app.bsky.actor.getProfile")) 125 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 126 "handle": "mock.handle", 127 "did": "did:plc:mock", 128 "displayName": "Mock User" 129 }))) 130 .mount(&mock_server) 131 .await; 132 133 Mock::given(method("GET")) 134 .and(path("/xrpc/app.bsky.actor.searchActors")) 135 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 136 "actors": [], 137 "cursor": null 138 }))) 139 .mount(&mock_server) 140 .await; 141 142 unsafe { 143 std::env::set_var("APPVIEW_URL", mock_server.uri()); 144 } 145 MOCK_APPVIEW.set(mock_server).ok(); 146 147 S3_CONTAINER.set(s3_container).ok(); 148 149 let container = Postgres::default() 150 .with_tag("18-alpine") 151 .with_label("bspds_test", "true") 152 .start() 153 .await 154 .expect("Failed to start Postgres"); 155 let connection_string = format!( 156 "postgres://postgres:postgres@127.0.0.1:{}/postgres", 157 container 158 .get_host_port_ipv4(5432) 159 .await 160 .expect("Failed to get port") 161 ); 162 163 DB_CONTAINER.set(container).ok(); 164 165 let url = spawn_app(connection_string).await; 166 tx.send(url).unwrap(); 167 std::future::pending::<()>().await; 168 }); 169 }); 170 171 rx.recv().expect("Failed to start test server") 172 }) 173} 174 175async fn spawn_app(database_url: String) -> String { 176 let pool = PgPoolOptions::new() 177 .max_connections(50) 178 .connect(&database_url) 179 .await 180 .expect("Failed to connect to Postgres. Make sure the database is running."); 181 182 sqlx::migrate!("./migrations") 183 .run(&pool) 184 .await 185 .expect("Failed to run migrations"); 186 187 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 188 let addr = listener.local_addr().unwrap(); 189 190 unsafe { 191 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 192 } 193 194 let state = AppState::new(pool).await; 195 let app = bspds::app(state); 196 197 tokio::spawn(async move { 198 axum::serve(listener, app).await.unwrap(); 199 }); 200 201 format!("http://{}", addr) 202} 203 204#[allow(dead_code)] 205pub async fn get_db_connection_string() -> String { 206 base_url().await; 207 let container = DB_CONTAINER.get().expect("DB container not initialized"); 208 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port"); 209 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port) 210} 211 212#[allow(dead_code)] 213pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value { 214 let res = client 215 .post(format!( 216 "{}/xrpc/com.atproto.repo.uploadBlob", 217 base_url().await 218 )) 219 .header(header::CONTENT_TYPE, mime) 220 .bearer_auth(AUTH_TOKEN) 221 .body(data) 222 .send() 223 .await 224 .expect("Failed to send uploadBlob request"); 225 226 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob"); 227 let body: Value = res.json().await.expect("Blob upload response was not JSON"); 228 body["blob"].clone() 229} 230 231#[allow(dead_code)] 232pub async fn create_test_post( 233 client: &Client, 234 text: &str, 235 reply_to: Option<Value>, 236) -> (String, String, String) { 237 let collection = "app.bsky.feed.post"; 238 let mut record = json!({ 239 "$type": collection, 240 "text": text, 241 "createdAt": Utc::now().to_rfc3339() 242 }); 243 244 if let Some(reply_obj) = reply_to { 245 record["reply"] = reply_obj; 246 } 247 248 let payload = json!({ 249 "repo": AUTH_DID, 250 "collection": collection, 251 "record": record 252 }); 253 254 let res = client 255 .post(format!( 256 "{}/xrpc/com.atproto.repo.createRecord", 257 base_url().await 258 )) 259 .bearer_auth(AUTH_TOKEN) 260 .json(&payload) 261 .send() 262 .await 263 .expect("Failed to send createRecord"); 264 265 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record"); 266 let body: Value = res 267 .json() 268 .await 269 .expect("createRecord response was not JSON"); 270 271 let uri = body["uri"] 272 .as_str() 273 .expect("Response had no URI") 274 .to_string(); 275 let cid = body["cid"] 276 .as_str() 277 .expect("Response had no CID") 278 .to_string(); 279 let rkey = uri 280 .split('/') 281 .last() 282 .expect("URI was malformed") 283 .to_string(); 284 285 (uri, cid, rkey) 286} 287 288#[allow(dead_code)] 289pub async fn create_account_and_login(client: &Client) -> (String, String) { 290 let mut last_error = String::new(); 291 292 for attempt in 0..3 { 293 if attempt > 0 { 294 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 295 } 296 297 let handle = format!("user_{}", uuid::Uuid::new_v4()); 298 let payload = json!({ 299 "handle": handle, 300 "email": format!("{}@example.com", handle), 301 "password": "password" 302 }); 303 304 let res = match client 305 .post(format!( 306 "{}/xrpc/com.atproto.server.createAccount", 307 base_url().await 308 )) 309 .json(&payload) 310 .send() 311 .await 312 { 313 Ok(r) => r, 314 Err(e) => { 315 last_error = format!("Request failed: {}", e); 316 continue; 317 } 318 }; 319 320 if res.status() == StatusCode::OK { 321 let body: Value = res.json().await.expect("Invalid JSON"); 322 let access_jwt = body["accessJwt"] 323 .as_str() 324 .expect("No accessJwt") 325 .to_string(); 326 let did = body["did"].as_str().expect("No did").to_string(); 327 return (access_jwt, did); 328 } 329 330 last_error = format!("Status {}: {:?}", res.status(), res.text().await); 331 } 332 333 panic!("Failed to create account after 3 attempts: {}", last_error); 334}