this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use tokio::net::TcpListener;
15use wiremock::matchers::{method, path};
16use wiremock::{Mock, MockServer, ResponseTemplate};
17
18static SERVER_URL: OnceLock<String> = OnceLock::new();
19static APP_PORT: OnceLock<u16> = OnceLock::new();
20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
21
22#[cfg(not(feature = "external-infra"))]
23use testcontainers::core::ContainerPort;
24#[cfg(not(feature = "external-infra"))]
25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
26#[cfg(not(feature = "external-infra"))]
27use testcontainers_modules::postgres::Postgres;
28#[cfg(not(feature = "external-infra"))]
29static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
30#[cfg(not(feature = "external-infra"))]
31static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
32
33#[allow(dead_code)]
34pub const AUTH_TOKEN: &str = "test-token";
35#[allow(dead_code)]
36pub const BAD_AUTH_TOKEN: &str = "bad-token";
37#[allow(dead_code)]
38pub const AUTH_DID: &str = "did:plc:fake";
39#[allow(dead_code)]
40pub const TARGET_DID: &str = "did:plc:target";
41
42fn has_external_infra() -> bool {
43 std::env::var("BSPDS_TEST_INFRA_READY").is_ok()
44 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok())
45}
46#[cfg(test)]
47#[ctor::dtor]
48fn cleanup() {
49 if has_external_infra() {
50 return;
51 }
52 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
53 let _ = std::process::Command::new("podman")
54 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
55 .output();
56 }
57 let _ = std::process::Command::new("docker")
58 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"])
59 .output();
60}
61
62#[allow(dead_code)]
63pub fn client() -> Client {
64 Client::new()
65}
66
67#[allow(dead_code)]
68pub fn app_port() -> u16 {
69 *APP_PORT.get().expect("APP_PORT not initialized")
70}
71
72pub async fn base_url() -> &'static str {
73 SERVER_URL.get_or_init(|| {
74 let (tx, rx) = std::sync::mpsc::channel();
75 std::thread::spawn(move || {
76 if std::env::var("DOCKER_HOST").is_err() {
77 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
78 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
79 if podman_sock.exists() {
80 unsafe {
81 std::env::set_var(
82 "DOCKER_HOST",
83 format!("unix://{}", podman_sock.display()),
84 );
85 }
86 }
87 }
88 }
89 let rt = tokio::runtime::Runtime::new().unwrap();
90 rt.block_on(async move {
91 if has_external_infra() {
92 let url = setup_with_external_infra().await;
93 tx.send(url).unwrap();
94 } else {
95 let url = setup_with_testcontainers().await;
96 tx.send(url).unwrap();
97 }
98 std::future::pending::<()>().await;
99 });
100 });
101 rx.recv().expect("Failed to start test server")
102 })
103}
104
105async fn setup_with_external_infra() -> String {
106 let database_url = std::env::var("DATABASE_URL")
107 .expect("DATABASE_URL must be set when using external infra");
108 let s3_endpoint = std::env::var("S3_ENDPOINT")
109 .expect("S3_ENDPOINT must be set when using external infra");
110 unsafe {
111 std::env::set_var("S3_BUCKET", std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()));
112 std::env::set_var("AWS_ACCESS_KEY_ID", std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()));
113 std::env::set_var("AWS_SECRET_ACCESS_KEY", std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()));
114 std::env::set_var("AWS_REGION", std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()));
115 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
116 }
117 let mock_server = MockServer::start().await;
118 setup_mock_appview(&mock_server).await;
119 unsafe {
120 std::env::set_var("APPVIEW_URL", mock_server.uri());
121 }
122 MOCK_APPVIEW.set(mock_server).ok();
123 spawn_app(database_url).await
124}
125
126#[cfg(not(feature = "external-infra"))]
127async fn setup_with_testcontainers() -> String {
128 let s3_container = GenericImage::new("minio/minio", "latest")
129 .with_exposed_port(ContainerPort::Tcp(9000))
130 .with_env_var("MINIO_ROOT_USER", "minioadmin")
131 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
132 .with_cmd(vec!["server".to_string(), "/data".to_string()])
133 .with_label("bspds_test", "true")
134 .start()
135 .await
136 .expect("Failed to start MinIO");
137 let s3_port = s3_container
138 .get_host_port_ipv4(9000)
139 .await
140 .expect("Failed to get S3 port");
141 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
142 unsafe {
143 std::env::set_var("S3_BUCKET", "test-bucket");
144 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
145 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
146 std::env::set_var("AWS_REGION", "us-east-1");
147 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
148 }
149 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
150 .region("us-east-1")
151 .endpoint_url(&s3_endpoint)
152 .credentials_provider(Credentials::new(
153 "minioadmin",
154 "minioadmin",
155 None,
156 None,
157 "test",
158 ))
159 .load()
160 .await;
161 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
162 .force_path_style(true)
163 .build();
164 let s3_client = S3Client::from_conf(s3_config);
165 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
166 let mock_server = MockServer::start().await;
167 setup_mock_appview(&mock_server).await;
168 unsafe {
169 std::env::set_var("APPVIEW_URL", mock_server.uri());
170 }
171 MOCK_APPVIEW.set(mock_server).ok();
172 S3_CONTAINER.set(s3_container).ok();
173 let container = Postgres::default()
174 .with_tag("18-alpine")
175 .with_label("bspds_test", "true")
176 .start()
177 .await
178 .expect("Failed to start Postgres");
179 let connection_string = format!(
180 "postgres://postgres:postgres@127.0.0.1:{}",
181 container
182 .get_host_port_ipv4(5432)
183 .await
184 .expect("Failed to get port")
185 );
186 DB_CONTAINER.set(container).ok();
187 spawn_app(connection_string).await
188}
189
190#[cfg(feature = "external-infra")]
191async fn setup_with_testcontainers() -> String {
192 panic!("Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT.");
193}
194
195async fn setup_mock_appview(mock_server: &MockServer) {
196 Mock::given(method("GET"))
197 .and(path("/xrpc/app.bsky.actor.getProfile"))
198 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
199 "handle": "mock.handle",
200 "did": "did:plc:mock",
201 "displayName": "Mock User"
202 })))
203 .mount(mock_server)
204 .await;
205 Mock::given(method("GET"))
206 .and(path("/xrpc/app.bsky.actor.searchActors"))
207 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
208 "actors": [],
209 "cursor": null
210 })))
211 .mount(mock_server)
212 .await;
213 Mock::given(method("GET"))
214 .and(path("/xrpc/app.bsky.feed.getTimeline"))
215 .respond_with(
216 ResponseTemplate::new(200)
217 .insert_header("atproto-repo-rev", "0")
218 .set_body_json(json!({
219 "feed": [],
220 "cursor": null
221 }))
222 )
223 .mount(mock_server)
224 .await;
225 Mock::given(method("GET"))
226 .and(path("/xrpc/app.bsky.feed.getAuthorFeed"))
227 .respond_with(
228 ResponseTemplate::new(200)
229 .insert_header("atproto-repo-rev", "0")
230 .set_body_json(json!({
231 "feed": [{
232 "post": {
233 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author",
234 "cid": "bafyappview123",
235 "author": {"did": "did:plc:mock-author", "handle": "mock.author"},
236 "record": {
237 "$type": "app.bsky.feed.post",
238 "text": "Author feed post from appview",
239 "createdAt": "2025-01-01T00:00:00Z"
240 },
241 "indexedAt": "2025-01-01T00:00:00Z"
242 }
243 }],
244 "cursor": "author-cursor"
245 })),
246 )
247 .mount(mock_server)
248 .await;
249 Mock::given(method("GET"))
250 .and(path("/xrpc/app.bsky.feed.getActorLikes"))
251 .respond_with(
252 ResponseTemplate::new(200)
253 .insert_header("atproto-repo-rev", "0")
254 .set_body_json(json!({
255 "feed": [{
256 "post": {
257 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post",
258 "cid": "bafyliked123",
259 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"},
260 "record": {
261 "$type": "app.bsky.feed.post",
262 "text": "Liked post from appview",
263 "createdAt": "2025-01-01T00:00:00Z"
264 },
265 "indexedAt": "2025-01-01T00:00:00Z"
266 }
267 }],
268 "cursor": null
269 })),
270 )
271 .mount(mock_server)
272 .await;
273 Mock::given(method("GET"))
274 .and(path("/xrpc/app.bsky.feed.getPostThread"))
275 .respond_with(
276 ResponseTemplate::new(200)
277 .insert_header("atproto-repo-rev", "0")
278 .set_body_json(json!({
279 "thread": {
280 "$type": "app.bsky.feed.defs#threadViewPost",
281 "post": {
282 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post",
283 "cid": "bafythread123",
284 "author": {"did": "did:plc:mock", "handle": "mock.handle"},
285 "record": {
286 "$type": "app.bsky.feed.post",
287 "text": "Thread post from appview",
288 "createdAt": "2025-01-01T00:00:00Z"
289 },
290 "indexedAt": "2025-01-01T00:00:00Z"
291 },
292 "replies": []
293 }
294 })),
295 )
296 .mount(mock_server)
297 .await;
298 Mock::given(method("GET"))
299 .and(path("/xrpc/app.bsky.feed.getFeed"))
300 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
301 "feed": [{
302 "post": {
303 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post",
304 "cid": "bafyfeed123",
305 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"},
306 "record": {
307 "$type": "app.bsky.feed.post",
308 "text": "Custom feed post from appview",
309 "createdAt": "2025-01-01T00:00:00Z"
310 },
311 "indexedAt": "2025-01-01T00:00:00Z"
312 }
313 }],
314 "cursor": null
315 })))
316 .mount(mock_server)
317 .await;
318 Mock::given(method("POST"))
319 .and(path("/xrpc/app.bsky.notification.registerPush"))
320 .respond_with(ResponseTemplate::new(200))
321 .mount(mock_server)
322 .await;
323}
324
325async fn spawn_app(database_url: String) -> String {
326 use bspds::rate_limit::RateLimiters;
327 let pool = PgPoolOptions::new()
328 .max_connections(50)
329 .connect(&database_url)
330 .await
331 .expect("Failed to connect to Postgres. Make sure the database is running.");
332 sqlx::migrate!("./migrations")
333 .run(&pool)
334 .await
335 .expect("Failed to run migrations");
336 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
337 let addr = listener.local_addr().unwrap();
338 APP_PORT.set(addr.port()).ok();
339 unsafe {
340 std::env::set_var("PDS_HOSTNAME", addr.to_string());
341 }
342 let rate_limiters = RateLimiters::new()
343 .with_login_limit(10000)
344 .with_account_creation_limit(10000)
345 .with_password_reset_limit(10000)
346 .with_email_update_limit(10000)
347 .with_oauth_authorize_limit(10000)
348 .with_oauth_token_limit(10000);
349 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters);
350 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
351 let app = bspds::app(state);
352 tokio::spawn(async move {
353 axum::serve(listener, app).await.unwrap();
354 });
355 format!("http://{}", addr)
356}
357
358#[allow(dead_code)]
359pub async fn get_db_connection_string() -> String {
360 base_url().await;
361 if has_external_infra() {
362 std::env::var("DATABASE_URL").expect("DATABASE_URL not set")
363 } else {
364 #[cfg(not(feature = "external-infra"))]
365 {
366 let container = DB_CONTAINER.get().expect("DB container not initialized");
367 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port");
368 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
369 }
370 #[cfg(feature = "external-infra")]
371 {
372 panic!("DATABASE_URL must be set with external-infra feature");
373 }
374 }
375}
376
377#[allow(dead_code)]
378pub async fn verify_new_account(client: &Client, did: &str) -> String {
379 let conn_str = get_db_connection_string().await;
380 let pool = sqlx::postgres::PgPoolOptions::new()
381 .max_connections(2)
382 .connect(&conn_str)
383 .await
384 .expect("Failed to connect to test database");
385 let verification_code: String = sqlx::query_scalar!(
386 "SELECT email_confirmation_code FROM users WHERE did = $1",
387 did
388 )
389 .fetch_one(&pool)
390 .await
391 .expect("Failed to get verification code")
392 .expect("No verification code found");
393 let confirm_payload = json!({
394 "did": did,
395 "verificationCode": verification_code
396 });
397 let confirm_res = client
398 .post(format!(
399 "{}/xrpc/com.atproto.server.confirmSignup",
400 base_url().await
401 ))
402 .json(&confirm_payload)
403 .send()
404 .await
405 .expect("confirmSignup request failed");
406 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed");
407 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup");
408 confirm_body["accessJwt"]
409 .as_str()
410 .expect("No accessJwt in confirmSignup response")
411 .to_string()
412}
413
414#[allow(dead_code)]
415pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
416 let res = client
417 .post(format!(
418 "{}/xrpc/com.atproto.repo.uploadBlob",
419 base_url().await
420 ))
421 .header(header::CONTENT_TYPE, mime)
422 .bearer_auth(AUTH_TOKEN)
423 .body(data)
424 .send()
425 .await
426 .expect("Failed to send uploadBlob request");
427 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
428 let body: Value = res.json().await.expect("Blob upload response was not JSON");
429 body["blob"].clone()
430}
431
432#[allow(dead_code)]
433pub async fn create_test_post(
434 client: &Client,
435 text: &str,
436 reply_to: Option<Value>,
437) -> (String, String, String) {
438 let collection = "app.bsky.feed.post";
439 let mut record = json!({
440 "$type": collection,
441 "text": text,
442 "createdAt": Utc::now().to_rfc3339()
443 });
444 if let Some(reply_obj) = reply_to {
445 record["reply"] = reply_obj;
446 }
447 let payload = json!({
448 "repo": AUTH_DID,
449 "collection": collection,
450 "record": record
451 });
452 let res = client
453 .post(format!(
454 "{}/xrpc/com.atproto.repo.createRecord",
455 base_url().await
456 ))
457 .bearer_auth(AUTH_TOKEN)
458 .json(&payload)
459 .send()
460 .await
461 .expect("Failed to send createRecord");
462 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
463 let body: Value = res
464 .json()
465 .await
466 .expect("createRecord response was not JSON");
467 let uri = body["uri"]
468 .as_str()
469 .expect("Response had no URI")
470 .to_string();
471 let cid = body["cid"]
472 .as_str()
473 .expect("Response had no CID")
474 .to_string();
475 let rkey = uri
476 .split('/')
477 .last()
478 .expect("URI was malformed")
479 .to_string();
480 (uri, cid, rkey)
481}
482
483#[allow(dead_code)]
484pub async fn create_account_and_login(client: &Client) -> (String, String) {
485 let mut last_error = String::new();
486 for attempt in 0..3 {
487 if attempt > 0 {
488 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
489 }
490 let handle = format!("user_{}", uuid::Uuid::new_v4());
491 let payload = json!({
492 "handle": handle,
493 "email": format!("{}@example.com", handle),
494 "password": "password"
495 });
496 let res = match client
497 .post(format!(
498 "{}/xrpc/com.atproto.server.createAccount",
499 base_url().await
500 ))
501 .json(&payload)
502 .send()
503 .await
504 {
505 Ok(r) => r,
506 Err(e) => {
507 last_error = format!("Request failed: {}", e);
508 continue;
509 }
510 };
511 if res.status() == StatusCode::OK {
512 let body: Value = res.json().await.expect("Invalid JSON");
513 if let Some(access_jwt) = body["accessJwt"].as_str() {
514 let did = body["did"].as_str().expect("No did").to_string();
515 return (access_jwt.to_string(), did);
516 }
517 let did = body["did"].as_str().expect("No did").to_string();
518 let conn_str = get_db_connection_string().await;
519 let pool = sqlx::postgres::PgPoolOptions::new()
520 .max_connections(2)
521 .connect(&conn_str)
522 .await
523 .expect("Failed to connect to test database");
524 let verification_code: String = sqlx::query_scalar!(
525 "SELECT email_confirmation_code FROM users WHERE did = $1",
526 &did
527 )
528 .fetch_one(&pool)
529 .await
530 .expect("Failed to get verification code")
531 .expect("No verification code found");
532 let confirm_payload = json!({
533 "did": did,
534 "verificationCode": verification_code
535 });
536 let confirm_res = client
537 .post(format!(
538 "{}/xrpc/com.atproto.server.confirmSignup",
539 base_url().await
540 ))
541 .json(&confirm_payload)
542 .send()
543 .await
544 .expect("confirmSignup request failed");
545 if confirm_res.status() == StatusCode::OK {
546 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup");
547 let access_jwt = confirm_body["accessJwt"]
548 .as_str()
549 .expect("No accessJwt in confirmSignup response")
550 .to_string();
551 return (access_jwt, did);
552 }
553 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await);
554 continue;
555 }
556 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
557 }
558 panic!("Failed to create account after 3 attempts: {}", last_error);
559}