this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use tokio::net::TcpListener;
15use wiremock::matchers::{method, path};
16use wiremock::{Mock, MockServer, ResponseTemplate};
17
18static SERVER_URL: OnceLock<String> = OnceLock::new();
19static APP_PORT: OnceLock<u16> = OnceLock::new();
20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
21
22#[cfg(not(feature = "external-infra"))]
23use testcontainers::core::ContainerPort;
24#[cfg(not(feature = "external-infra"))]
25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
26#[cfg(not(feature = "external-infra"))]
27use testcontainers_modules::postgres::Postgres;
28
29#[cfg(not(feature = "external-infra"))]
30static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
31#[cfg(not(feature = "external-infra"))]
32static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
33
34#[allow(dead_code)]
35pub const AUTH_TOKEN: &str = "test-token";
36#[allow(dead_code)]
37pub const BAD_AUTH_TOKEN: &str = "bad-token";
38#[allow(dead_code)]
39pub const AUTH_DID: &str = "did:plc:fake";
40#[allow(dead_code)]
41pub const TARGET_DID: &str = "did:plc:target";
42
43fn has_external_infra() -> bool {
44 std::env::var("BSPDS_TEST_INFRA_READY").is_ok()
45 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok())
46}
47
48#[cfg(test)]
49#[ctor::dtor]
50fn cleanup() {
51 if has_external_infra() {
52 return;
53 }
54
55 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
56 let _ = std::process::Command::new("podman")
57 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
58 .output();
59 }
60
61 let _ = std::process::Command::new("docker")
62 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"])
63 .output();
64}
65
66#[allow(dead_code)]
67pub fn client() -> Client {
68 Client::new()
69}
70
71#[allow(dead_code)]
72pub fn app_port() -> u16 {
73 *APP_PORT.get().expect("APP_PORT not initialized")
74}
75
76pub async fn base_url() -> &'static str {
77 SERVER_URL.get_or_init(|| {
78 let (tx, rx) = std::sync::mpsc::channel();
79
80 std::thread::spawn(move || {
81 if std::env::var("DOCKER_HOST").is_err() {
82 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
83 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
84 if podman_sock.exists() {
85 unsafe {
86 std::env::set_var(
87 "DOCKER_HOST",
88 format!("unix://{}", podman_sock.display()),
89 );
90 }
91 }
92 }
93 }
94
95 let rt = tokio::runtime::Runtime::new().unwrap();
96 rt.block_on(async move {
97 if has_external_infra() {
98 let url = setup_with_external_infra().await;
99 tx.send(url).unwrap();
100 } else {
101 let url = setup_with_testcontainers().await;
102 tx.send(url).unwrap();
103 }
104 std::future::pending::<()>().await;
105 });
106 });
107
108 rx.recv().expect("Failed to start test server")
109 })
110}
111
112async fn setup_with_external_infra() -> String {
113 let database_url = std::env::var("DATABASE_URL")
114 .expect("DATABASE_URL must be set when using external infra");
115 let s3_endpoint = std::env::var("S3_ENDPOINT")
116 .expect("S3_ENDPOINT must be set when using external infra");
117
118 unsafe {
119 std::env::set_var("S3_BUCKET", std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()));
120 std::env::set_var("AWS_ACCESS_KEY_ID", std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()));
121 std::env::set_var("AWS_SECRET_ACCESS_KEY", std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()));
122 std::env::set_var("AWS_REGION", std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()));
123 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
124 }
125
126 let mock_server = MockServer::start().await;
127 setup_mock_appview(&mock_server).await;
128
129 unsafe {
130 std::env::set_var("APPVIEW_URL", mock_server.uri());
131 }
132 MOCK_APPVIEW.set(mock_server).ok();
133
134 spawn_app(database_url).await
135}
136
137#[cfg(not(feature = "external-infra"))]
138async fn setup_with_testcontainers() -> String {
139 let s3_container = GenericImage::new("minio/minio", "latest")
140 .with_exposed_port(ContainerPort::Tcp(9000))
141 .with_env_var("MINIO_ROOT_USER", "minioadmin")
142 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
143 .with_cmd(vec!["server".to_string(), "/data".to_string()])
144 .with_label("bspds_test", "true")
145 .start()
146 .await
147 .expect("Failed to start MinIO");
148
149 let s3_port = s3_container
150 .get_host_port_ipv4(9000)
151 .await
152 .expect("Failed to get S3 port");
153 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
154
155 unsafe {
156 std::env::set_var("S3_BUCKET", "test-bucket");
157 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
158 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
159 std::env::set_var("AWS_REGION", "us-east-1");
160 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
161 }
162
163 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
164 .region("us-east-1")
165 .endpoint_url(&s3_endpoint)
166 .credentials_provider(Credentials::new(
167 "minioadmin",
168 "minioadmin",
169 None,
170 None,
171 "test",
172 ))
173 .load()
174 .await;
175
176 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
177 .force_path_style(true)
178 .build();
179 let s3_client = S3Client::from_conf(s3_config);
180
181 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
182
183 let mock_server = MockServer::start().await;
184 setup_mock_appview(&mock_server).await;
185
186 unsafe {
187 std::env::set_var("APPVIEW_URL", mock_server.uri());
188 }
189 MOCK_APPVIEW.set(mock_server).ok();
190
191 S3_CONTAINER.set(s3_container).ok();
192
193 let container = Postgres::default()
194 .with_tag("18-alpine")
195 .with_label("bspds_test", "true")
196 .start()
197 .await
198 .expect("Failed to start Postgres");
199 let connection_string = format!(
200 "postgres://postgres:postgres@127.0.0.1:{}",
201 container
202 .get_host_port_ipv4(5432)
203 .await
204 .expect("Failed to get port")
205 );
206
207 DB_CONTAINER.set(container).ok();
208
209 spawn_app(connection_string).await
210}
211
212#[cfg(feature = "external-infra")]
213async fn setup_with_testcontainers() -> String {
214 panic!("Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT.");
215}
216
217async fn setup_mock_appview(mock_server: &MockServer) {
218 Mock::given(method("GET"))
219 .and(path("/xrpc/app.bsky.actor.getProfile"))
220 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
221 "handle": "mock.handle",
222 "did": "did:plc:mock",
223 "displayName": "Mock User"
224 })))
225 .mount(mock_server)
226 .await;
227
228 Mock::given(method("GET"))
229 .and(path("/xrpc/app.bsky.actor.searchActors"))
230 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
231 "actors": [],
232 "cursor": null
233 })))
234 .mount(mock_server)
235 .await;
236}
237
238async fn spawn_app(database_url: String) -> String {
239 let pool = PgPoolOptions::new()
240 .max_connections(50)
241 .connect(&database_url)
242 .await
243 .expect("Failed to connect to Postgres. Make sure the database is running.");
244
245 sqlx::migrate!("./migrations")
246 .run(&pool)
247 .await
248 .expect("Failed to run migrations");
249
250 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
251 let addr = listener.local_addr().unwrap();
252 APP_PORT.set(addr.port()).ok();
253
254 unsafe {
255 std::env::set_var("PDS_HOSTNAME", addr.to_string());
256 }
257
258 let state = AppState::new(pool).await;
259
260 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
261
262 let app = bspds::app(state);
263
264 tokio::spawn(async move {
265 axum::serve(listener, app).await.unwrap();
266 });
267
268 format!("http://{}", addr)
269}
270
271#[allow(dead_code)]
272pub async fn get_db_connection_string() -> String {
273 base_url().await;
274
275 if has_external_infra() {
276 std::env::var("DATABASE_URL").expect("DATABASE_URL not set")
277 } else {
278 #[cfg(not(feature = "external-infra"))]
279 {
280 let container = DB_CONTAINER.get().expect("DB container not initialized");
281 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port");
282 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
283 }
284 #[cfg(feature = "external-infra")]
285 {
286 panic!("DATABASE_URL must be set with external-infra feature");
287 }
288 }
289}
290
291#[allow(dead_code)]
292pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
293 let res = client
294 .post(format!(
295 "{}/xrpc/com.atproto.repo.uploadBlob",
296 base_url().await
297 ))
298 .header(header::CONTENT_TYPE, mime)
299 .bearer_auth(AUTH_TOKEN)
300 .body(data)
301 .send()
302 .await
303 .expect("Failed to send uploadBlob request");
304
305 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
306 let body: Value = res.json().await.expect("Blob upload response was not JSON");
307 body["blob"].clone()
308}
309
310#[allow(dead_code)]
311pub async fn create_test_post(
312 client: &Client,
313 text: &str,
314 reply_to: Option<Value>,
315) -> (String, String, String) {
316 let collection = "app.bsky.feed.post";
317 let mut record = json!({
318 "$type": collection,
319 "text": text,
320 "createdAt": Utc::now().to_rfc3339()
321 });
322
323 if let Some(reply_obj) = reply_to {
324 record["reply"] = reply_obj;
325 }
326
327 let payload = json!({
328 "repo": AUTH_DID,
329 "collection": collection,
330 "record": record
331 });
332
333 let res = client
334 .post(format!(
335 "{}/xrpc/com.atproto.repo.createRecord",
336 base_url().await
337 ))
338 .bearer_auth(AUTH_TOKEN)
339 .json(&payload)
340 .send()
341 .await
342 .expect("Failed to send createRecord");
343
344 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
345 let body: Value = res
346 .json()
347 .await
348 .expect("createRecord response was not JSON");
349
350 let uri = body["uri"]
351 .as_str()
352 .expect("Response had no URI")
353 .to_string();
354 let cid = body["cid"]
355 .as_str()
356 .expect("Response had no CID")
357 .to_string();
358 let rkey = uri
359 .split('/')
360 .last()
361 .expect("URI was malformed")
362 .to_string();
363
364 (uri, cid, rkey)
365}
366
367#[allow(dead_code)]
368pub async fn create_account_and_login(client: &Client) -> (String, String) {
369 let mut last_error = String::new();
370
371 for attempt in 0..3 {
372 if attempt > 0 {
373 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
374 }
375
376 let handle = format!("user_{}", uuid::Uuid::new_v4());
377 let payload = json!({
378 "handle": handle,
379 "email": format!("{}@example.com", handle),
380 "password": "password"
381 });
382
383 let res = match client
384 .post(format!(
385 "{}/xrpc/com.atproto.server.createAccount",
386 base_url().await
387 ))
388 .json(&payload)
389 .send()
390 .await
391 {
392 Ok(r) => r,
393 Err(e) => {
394 last_error = format!("Request failed: {}", e);
395 continue;
396 }
397 };
398
399 if res.status() == StatusCode::OK {
400 let body: Value = res.json().await.expect("Invalid JSON");
401 let access_jwt = body["accessJwt"]
402 .as_str()
403 .expect("No accessJwt")
404 .to_string();
405 let did = body["did"].as_str().expect("No did").to_string();
406 return (access_jwt, did);
407 }
408
409 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
410 }
411
412 panic!("Failed to create account after 3 attempts: {}", last_error);
413}