this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use tokio::net::TcpListener;
15use wiremock::matchers::{method, path};
16use wiremock::{Mock, MockServer, ResponseTemplate};
17
18static SERVER_URL: OnceLock<String> = OnceLock::new();
19static APP_PORT: OnceLock<u16> = OnceLock::new();
20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
21
22#[cfg(not(feature = "external-infra"))]
23use testcontainers::core::ContainerPort;
24#[cfg(not(feature = "external-infra"))]
25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
26#[cfg(not(feature = "external-infra"))]
27use testcontainers_modules::postgres::Postgres;
28#[cfg(not(feature = "external-infra"))]
29static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
30#[cfg(not(feature = "external-infra"))]
31static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
32
33#[allow(dead_code)]
34pub const AUTH_TOKEN: &str = "test-token";
35#[allow(dead_code)]
36pub const BAD_AUTH_TOKEN: &str = "bad-token";
37#[allow(dead_code)]
38pub const AUTH_DID: &str = "did:plc:fake";
39#[allow(dead_code)]
40pub const TARGET_DID: &str = "did:plc:target";
41
42fn has_external_infra() -> bool {
43 std::env::var("BSPDS_TEST_INFRA_READY").is_ok()
44 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok())
45}
46#[cfg(test)]
47#[ctor::dtor]
48fn cleanup() {
49 if has_external_infra() {
50 return;
51 }
52 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
53 let _ = std::process::Command::new("podman")
54 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
55 .output();
56 }
57 let _ = std::process::Command::new("docker")
58 .args(&[
59 "container",
60 "prune",
61 "-f",
62 "--filter",
63 "label=bspds_test=true",
64 ])
65 .output();
66}
67
68#[allow(dead_code)]
69pub fn client() -> Client {
70 Client::new()
71}
72
73#[allow(dead_code)]
74pub fn app_port() -> u16 {
75 *APP_PORT.get().expect("APP_PORT not initialized")
76}
77
78pub async fn base_url() -> &'static str {
79 SERVER_URL.get_or_init(|| {
80 let (tx, rx) = std::sync::mpsc::channel();
81 std::thread::spawn(move || {
82 if std::env::var("DOCKER_HOST").is_err() {
83 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
84 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
85 if podman_sock.exists() {
86 unsafe {
87 std::env::set_var(
88 "DOCKER_HOST",
89 format!("unix://{}", podman_sock.display()),
90 );
91 }
92 }
93 }
94 }
95 let rt = tokio::runtime::Runtime::new().unwrap();
96 rt.block_on(async move {
97 if has_external_infra() {
98 let url = setup_with_external_infra().await;
99 tx.send(url).unwrap();
100 } else {
101 let url = setup_with_testcontainers().await;
102 tx.send(url).unwrap();
103 }
104 std::future::pending::<()>().await;
105 });
106 });
107 rx.recv().expect("Failed to start test server")
108 })
109}
110
111async fn setup_with_external_infra() -> String {
112 let database_url =
113 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra");
114 let s3_endpoint =
115 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra");
116 unsafe {
117 std::env::set_var(
118 "S3_BUCKET",
119 std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()),
120 );
121 std::env::set_var(
122 "AWS_ACCESS_KEY_ID",
123 std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()),
124 );
125 std::env::set_var(
126 "AWS_SECRET_ACCESS_KEY",
127 std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()),
128 );
129 std::env::set_var(
130 "AWS_REGION",
131 std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()),
132 );
133 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
134 }
135 let mock_server = MockServer::start().await;
136 setup_mock_appview(&mock_server).await;
137 unsafe {
138 std::env::set_var("APPVIEW_URL", mock_server.uri());
139 }
140 MOCK_APPVIEW.set(mock_server).ok();
141 spawn_app(database_url).await
142}
143
144#[cfg(not(feature = "external-infra"))]
145async fn setup_with_testcontainers() -> String {
146 let s3_container = GenericImage::new("minio/minio", "latest")
147 .with_exposed_port(ContainerPort::Tcp(9000))
148 .with_env_var("MINIO_ROOT_USER", "minioadmin")
149 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
150 .with_cmd(vec!["server".to_string(), "/data".to_string()])
151 .with_label("bspds_test", "true")
152 .start()
153 .await
154 .expect("Failed to start MinIO");
155 let s3_port = s3_container
156 .get_host_port_ipv4(9000)
157 .await
158 .expect("Failed to get S3 port");
159 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
160 unsafe {
161 std::env::set_var("S3_BUCKET", "test-bucket");
162 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
163 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
164 std::env::set_var("AWS_REGION", "us-east-1");
165 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
166 }
167 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
168 .region("us-east-1")
169 .endpoint_url(&s3_endpoint)
170 .credentials_provider(Credentials::new(
171 "minioadmin",
172 "minioadmin",
173 None,
174 None,
175 "test",
176 ))
177 .load()
178 .await;
179 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
180 .force_path_style(true)
181 .build();
182 let s3_client = S3Client::from_conf(s3_config);
183 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
184 let mock_server = MockServer::start().await;
185 setup_mock_appview(&mock_server).await;
186 unsafe {
187 std::env::set_var("APPVIEW_URL", mock_server.uri());
188 }
189 MOCK_APPVIEW.set(mock_server).ok();
190 S3_CONTAINER.set(s3_container).ok();
191 let container = Postgres::default()
192 .with_tag("18-alpine")
193 .with_label("bspds_test", "true")
194 .start()
195 .await
196 .expect("Failed to start Postgres");
197 let connection_string = format!(
198 "postgres://postgres:postgres@127.0.0.1:{}",
199 container
200 .get_host_port_ipv4(5432)
201 .await
202 .expect("Failed to get port")
203 );
204 DB_CONTAINER.set(container).ok();
205 spawn_app(connection_string).await
206}
207
208#[cfg(feature = "external-infra")]
209async fn setup_with_testcontainers() -> String {
210 panic!(
211 "Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT."
212 );
213}
214
215async fn setup_mock_appview(mock_server: &MockServer) {
216 Mock::given(method("GET"))
217 .and(path("/xrpc/app.bsky.actor.getProfile"))
218 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
219 "handle": "mock.handle",
220 "did": "did:plc:mock",
221 "displayName": "Mock User"
222 })))
223 .mount(mock_server)
224 .await;
225 Mock::given(method("GET"))
226 .and(path("/xrpc/app.bsky.actor.searchActors"))
227 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
228 "actors": [],
229 "cursor": null
230 })))
231 .mount(mock_server)
232 .await;
233 Mock::given(method("GET"))
234 .and(path("/xrpc/app.bsky.feed.getTimeline"))
235 .respond_with(
236 ResponseTemplate::new(200)
237 .insert_header("atproto-repo-rev", "0")
238 .set_body_json(json!({
239 "feed": [],
240 "cursor": null
241 })),
242 )
243 .mount(mock_server)
244 .await;
245 Mock::given(method("GET"))
246 .and(path("/xrpc/app.bsky.feed.getAuthorFeed"))
247 .respond_with(
248 ResponseTemplate::new(200)
249 .insert_header("atproto-repo-rev", "0")
250 .set_body_json(json!({
251 "feed": [{
252 "post": {
253 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author",
254 "cid": "bafyappview123",
255 "author": {"did": "did:plc:mock-author", "handle": "mock.author"},
256 "record": {
257 "$type": "app.bsky.feed.post",
258 "text": "Author feed post from appview",
259 "createdAt": "2025-01-01T00:00:00Z"
260 },
261 "indexedAt": "2025-01-01T00:00:00Z"
262 }
263 }],
264 "cursor": "author-cursor"
265 })),
266 )
267 .mount(mock_server)
268 .await;
269 Mock::given(method("GET"))
270 .and(path("/xrpc/app.bsky.feed.getActorLikes"))
271 .respond_with(
272 ResponseTemplate::new(200)
273 .insert_header("atproto-repo-rev", "0")
274 .set_body_json(json!({
275 "feed": [{
276 "post": {
277 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post",
278 "cid": "bafyliked123",
279 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"},
280 "record": {
281 "$type": "app.bsky.feed.post",
282 "text": "Liked post from appview",
283 "createdAt": "2025-01-01T00:00:00Z"
284 },
285 "indexedAt": "2025-01-01T00:00:00Z"
286 }
287 }],
288 "cursor": null
289 })),
290 )
291 .mount(mock_server)
292 .await;
293 Mock::given(method("GET"))
294 .and(path("/xrpc/app.bsky.feed.getPostThread"))
295 .respond_with(
296 ResponseTemplate::new(200)
297 .insert_header("atproto-repo-rev", "0")
298 .set_body_json(json!({
299 "thread": {
300 "$type": "app.bsky.feed.defs#threadViewPost",
301 "post": {
302 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post",
303 "cid": "bafythread123",
304 "author": {"did": "did:plc:mock", "handle": "mock.handle"},
305 "record": {
306 "$type": "app.bsky.feed.post",
307 "text": "Thread post from appview",
308 "createdAt": "2025-01-01T00:00:00Z"
309 },
310 "indexedAt": "2025-01-01T00:00:00Z"
311 },
312 "replies": []
313 }
314 })),
315 )
316 .mount(mock_server)
317 .await;
318 Mock::given(method("GET"))
319 .and(path("/xrpc/app.bsky.feed.getFeed"))
320 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
321 "feed": [{
322 "post": {
323 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post",
324 "cid": "bafyfeed123",
325 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"},
326 "record": {
327 "$type": "app.bsky.feed.post",
328 "text": "Custom feed post from appview",
329 "createdAt": "2025-01-01T00:00:00Z"
330 },
331 "indexedAt": "2025-01-01T00:00:00Z"
332 }
333 }],
334 "cursor": null
335 })))
336 .mount(mock_server)
337 .await;
338 Mock::given(method("POST"))
339 .and(path("/xrpc/app.bsky.notification.registerPush"))
340 .respond_with(ResponseTemplate::new(200))
341 .mount(mock_server)
342 .await;
343}
344
345async fn spawn_app(database_url: String) -> String {
346 use bspds::rate_limit::RateLimiters;
347 let pool = PgPoolOptions::new()
348 .max_connections(50)
349 .connect(&database_url)
350 .await
351 .expect("Failed to connect to Postgres. Make sure the database is running.");
352 sqlx::migrate!("./migrations")
353 .run(&pool)
354 .await
355 .expect("Failed to run migrations");
356 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
357 let addr = listener.local_addr().unwrap();
358 APP_PORT.set(addr.port()).ok();
359 unsafe {
360 std::env::set_var("PDS_HOSTNAME", addr.to_string());
361 }
362 let rate_limiters = RateLimiters::new()
363 .with_login_limit(10000)
364 .with_account_creation_limit(10000)
365 .with_password_reset_limit(10000)
366 .with_email_update_limit(10000)
367 .with_oauth_authorize_limit(10000)
368 .with_oauth_token_limit(10000);
369 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters);
370 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
371 let app = bspds::app(state);
372 tokio::spawn(async move {
373 axum::serve(listener, app).await.unwrap();
374 });
375 format!("http://{}", addr)
376}
377
378#[allow(dead_code)]
379pub async fn get_db_connection_string() -> String {
380 base_url().await;
381 if has_external_infra() {
382 std::env::var("DATABASE_URL").expect("DATABASE_URL not set")
383 } else {
384 #[cfg(not(feature = "external-infra"))]
385 {
386 let container = DB_CONTAINER.get().expect("DB container not initialized");
387 let port = container
388 .get_host_port_ipv4(5432)
389 .await
390 .expect("Failed to get port");
391 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
392 }
393 #[cfg(feature = "external-infra")]
394 {
395 panic!("DATABASE_URL must be set with external-infra feature");
396 }
397 }
398}
399
400#[allow(dead_code)]
401pub async fn verify_new_account(client: &Client, did: &str) -> String {
402 let conn_str = get_db_connection_string().await;
403 let pool = sqlx::postgres::PgPoolOptions::new()
404 .max_connections(2)
405 .connect(&conn_str)
406 .await
407 .expect("Failed to connect to test database");
408 let verification_code: String = sqlx::query_scalar!(
409 "SELECT email_confirmation_code FROM users WHERE did = $1",
410 did
411 )
412 .fetch_one(&pool)
413 .await
414 .expect("Failed to get verification code")
415 .expect("No verification code found");
416 let confirm_payload = json!({
417 "did": did,
418 "verificationCode": verification_code
419 });
420 let confirm_res = client
421 .post(format!(
422 "{}/xrpc/com.atproto.server.confirmSignup",
423 base_url().await
424 ))
425 .json(&confirm_payload)
426 .send()
427 .await
428 .expect("confirmSignup request failed");
429 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed");
430 let confirm_body: Value = confirm_res
431 .json()
432 .await
433 .expect("Invalid JSON from confirmSignup");
434 confirm_body["accessJwt"]
435 .as_str()
436 .expect("No accessJwt in confirmSignup response")
437 .to_string()
438}
439
440#[allow(dead_code)]
441pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
442 let res = client
443 .post(format!(
444 "{}/xrpc/com.atproto.repo.uploadBlob",
445 base_url().await
446 ))
447 .header(header::CONTENT_TYPE, mime)
448 .bearer_auth(AUTH_TOKEN)
449 .body(data)
450 .send()
451 .await
452 .expect("Failed to send uploadBlob request");
453 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
454 let body: Value = res.json().await.expect("Blob upload response was not JSON");
455 body["blob"].clone()
456}
457
458#[allow(dead_code)]
459pub async fn create_test_post(
460 client: &Client,
461 text: &str,
462 reply_to: Option<Value>,
463) -> (String, String, String) {
464 let collection = "app.bsky.feed.post";
465 let mut record = json!({
466 "$type": collection,
467 "text": text,
468 "createdAt": Utc::now().to_rfc3339()
469 });
470 if let Some(reply_obj) = reply_to {
471 record["reply"] = reply_obj;
472 }
473 let payload = json!({
474 "repo": AUTH_DID,
475 "collection": collection,
476 "record": record
477 });
478 let res = client
479 .post(format!(
480 "{}/xrpc/com.atproto.repo.createRecord",
481 base_url().await
482 ))
483 .bearer_auth(AUTH_TOKEN)
484 .json(&payload)
485 .send()
486 .await
487 .expect("Failed to send createRecord");
488 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
489 let body: Value = res
490 .json()
491 .await
492 .expect("createRecord response was not JSON");
493 let uri = body["uri"]
494 .as_str()
495 .expect("Response had no URI")
496 .to_string();
497 let cid = body["cid"]
498 .as_str()
499 .expect("Response had no CID")
500 .to_string();
501 let rkey = uri
502 .split('/')
503 .last()
504 .expect("URI was malformed")
505 .to_string();
506 (uri, cid, rkey)
507}
508
509#[allow(dead_code)]
510pub async fn create_account_and_login(client: &Client) -> (String, String) {
511 let mut last_error = String::new();
512 for attempt in 0..3 {
513 if attempt > 0 {
514 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
515 }
516 let handle = format!("user_{}", uuid::Uuid::new_v4());
517 let payload = json!({
518 "handle": handle,
519 "email": format!("{}@example.com", handle),
520 "password": "password"
521 });
522 let res = match client
523 .post(format!(
524 "{}/xrpc/com.atproto.server.createAccount",
525 base_url().await
526 ))
527 .json(&payload)
528 .send()
529 .await
530 {
531 Ok(r) => r,
532 Err(e) => {
533 last_error = format!("Request failed: {}", e);
534 continue;
535 }
536 };
537 if res.status() == StatusCode::OK {
538 let body: Value = res.json().await.expect("Invalid JSON");
539 if let Some(access_jwt) = body["accessJwt"].as_str() {
540 let did = body["did"].as_str().expect("No did").to_string();
541 return (access_jwt.to_string(), did);
542 }
543 let did = body["did"].as_str().expect("No did").to_string();
544 let conn_str = get_db_connection_string().await;
545 let pool = sqlx::postgres::PgPoolOptions::new()
546 .max_connections(2)
547 .connect(&conn_str)
548 .await
549 .expect("Failed to connect to test database");
550 let verification_code: String = sqlx::query_scalar!(
551 "SELECT email_confirmation_code FROM users WHERE did = $1",
552 &did
553 )
554 .fetch_one(&pool)
555 .await
556 .expect("Failed to get verification code")
557 .expect("No verification code found");
558 let confirm_payload = json!({
559 "did": did,
560 "verificationCode": verification_code
561 });
562 let confirm_res = client
563 .post(format!(
564 "{}/xrpc/com.atproto.server.confirmSignup",
565 base_url().await
566 ))
567 .json(&confirm_payload)
568 .send()
569 .await
570 .expect("confirmSignup request failed");
571 if confirm_res.status() == StatusCode::OK {
572 let confirm_body: Value = confirm_res
573 .json()
574 .await
575 .expect("Invalid JSON from confirmSignup");
576 let access_jwt = confirm_body["accessJwt"]
577 .as_str()
578 .expect("No accessJwt in confirmSignup response")
579 .to_string();
580 return (access_jwt, did);
581 }
582 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await);
583 continue;
584 }
585 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
586 }
587 panic!("Failed to create account after 3 attempts: {}", last_error);
588}