this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use tokio::net::TcpListener;
15use wiremock::matchers::{method, path};
16use wiremock::{Mock, MockServer, ResponseTemplate};
17
18static SERVER_URL: OnceLock<String> = OnceLock::new();
19static APP_PORT: OnceLock<u16> = OnceLock::new();
20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
21
22#[cfg(not(feature = "external-infra"))]
23use testcontainers::core::ContainerPort;
24#[cfg(not(feature = "external-infra"))]
25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
26#[cfg(not(feature = "external-infra"))]
27use testcontainers_modules::postgres::Postgres;
28#[cfg(not(feature = "external-infra"))]
29static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
30#[cfg(not(feature = "external-infra"))]
31static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
32
33#[allow(dead_code)]
34pub const AUTH_TOKEN: &str = "test-token";
35#[allow(dead_code)]
36pub const BAD_AUTH_TOKEN: &str = "bad-token";
37#[allow(dead_code)]
38pub const AUTH_DID: &str = "did:plc:fake";
39#[allow(dead_code)]
40pub const TARGET_DID: &str = "did:plc:target";
41
42fn has_external_infra() -> bool {
43 std::env::var("BSPDS_TEST_INFRA_READY").is_ok()
44 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok())
45}
46#[cfg(test)]
47#[ctor::dtor]
48fn cleanup() {
49 if has_external_infra() {
50 return;
51 }
52 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
53 let _ = std::process::Command::new("podman")
54 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
55 .output();
56 }
57 let _ = std::process::Command::new("docker")
58 .args(&[
59 "container",
60 "prune",
61 "-f",
62 "--filter",
63 "label=bspds_test=true",
64 ])
65 .output();
66}
67
68#[allow(dead_code)]
69pub fn client() -> Client {
70 Client::new()
71}
72
73#[allow(dead_code)]
74pub fn app_port() -> u16 {
75 *APP_PORT.get().expect("APP_PORT not initialized")
76}
77
78pub async fn base_url() -> &'static str {
79 SERVER_URL.get_or_init(|| {
80 let (tx, rx) = std::sync::mpsc::channel();
81 std::thread::spawn(move || {
82 unsafe {
83 std::env::set_var("BSPDS_ALLOW_INSECURE_SECRETS", "1");
84 }
85 if std::env::var("DOCKER_HOST").is_err() {
86 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
87 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
88 if podman_sock.exists() {
89 unsafe {
90 std::env::set_var(
91 "DOCKER_HOST",
92 format!("unix://{}", podman_sock.display()),
93 );
94 }
95 }
96 }
97 }
98 let rt = tokio::runtime::Runtime::new().unwrap();
99 rt.block_on(async move {
100 if has_external_infra() {
101 let url = setup_with_external_infra().await;
102 tx.send(url).unwrap();
103 } else {
104 let url = setup_with_testcontainers().await;
105 tx.send(url).unwrap();
106 }
107 std::future::pending::<()>().await;
108 });
109 });
110 rx.recv().expect("Failed to start test server")
111 })
112}
113
114async fn setup_with_external_infra() -> String {
115 let database_url =
116 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra");
117 let s3_endpoint =
118 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra");
119 unsafe {
120 std::env::set_var(
121 "S3_BUCKET",
122 std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()),
123 );
124 std::env::set_var(
125 "AWS_ACCESS_KEY_ID",
126 std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()),
127 );
128 std::env::set_var(
129 "AWS_SECRET_ACCESS_KEY",
130 std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()),
131 );
132 std::env::set_var(
133 "AWS_REGION",
134 std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()),
135 );
136 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
137 }
138 let mock_server = MockServer::start().await;
139 setup_mock_appview(&mock_server).await;
140 let mock_uri = mock_server.uri();
141 let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri);
142 let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A"));
143 setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await;
144 unsafe {
145 std::env::set_var("APPVIEW_DID_APP_BSKY", &mock_did);
146 }
147 MOCK_APPVIEW.set(mock_server).ok();
148 spawn_app(database_url).await
149}
150
151#[cfg(not(feature = "external-infra"))]
152async fn setup_with_testcontainers() -> String {
153 let s3_container = GenericImage::new("minio/minio", "latest")
154 .with_exposed_port(ContainerPort::Tcp(9000))
155 .with_env_var("MINIO_ROOT_USER", "minioadmin")
156 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
157 .with_cmd(vec!["server".to_string(), "/data".to_string()])
158 .with_label("bspds_test", "true")
159 .start()
160 .await
161 .expect("Failed to start MinIO");
162 let s3_port = s3_container
163 .get_host_port_ipv4(9000)
164 .await
165 .expect("Failed to get S3 port");
166 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
167 unsafe {
168 std::env::set_var("S3_BUCKET", "test-bucket");
169 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
170 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
171 std::env::set_var("AWS_REGION", "us-east-1");
172 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
173 }
174 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
175 .region("us-east-1")
176 .endpoint_url(&s3_endpoint)
177 .credentials_provider(Credentials::new(
178 "minioadmin",
179 "minioadmin",
180 None,
181 None,
182 "test",
183 ))
184 .load()
185 .await;
186 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
187 .force_path_style(true)
188 .build();
189 let s3_client = S3Client::from_conf(s3_config);
190 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
191 let mock_server = MockServer::start().await;
192 setup_mock_appview(&mock_server).await;
193 let mock_uri = mock_server.uri();
194 let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri);
195 let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A"));
196 setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await;
197 unsafe {
198 std::env::set_var("APPVIEW_DID_APP_BSKY", &mock_did);
199 }
200 MOCK_APPVIEW.set(mock_server).ok();
201 S3_CONTAINER.set(s3_container).ok();
202 let container = Postgres::default()
203 .with_tag("18-alpine")
204 .with_label("bspds_test", "true")
205 .start()
206 .await
207 .expect("Failed to start Postgres");
208 let connection_string = format!(
209 "postgres://postgres:postgres@127.0.0.1:{}",
210 container
211 .get_host_port_ipv4(5432)
212 .await
213 .expect("Failed to get port")
214 );
215 DB_CONTAINER.set(container).ok();
216 spawn_app(connection_string).await
217}
218
219#[cfg(feature = "external-infra")]
220async fn setup_with_testcontainers() -> String {
221 panic!(
222 "Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT."
223 );
224}
225
226async fn setup_mock_did_document(mock_server: &MockServer, did: &str, service_endpoint: &str) {
227 Mock::given(method("GET"))
228 .and(path("/.well-known/did.json"))
229 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
230 "id": did,
231 "service": [{
232 "id": "#atproto_appview",
233 "type": "AtprotoAppView",
234 "serviceEndpoint": service_endpoint
235 }]
236 })))
237 .mount(mock_server)
238 .await;
239}
240
241async fn setup_mock_appview(mock_server: &MockServer) {
242 Mock::given(method("GET"))
243 .and(path("/xrpc/app.bsky.actor.getProfile"))
244 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
245 "handle": "mock.handle",
246 "did": "did:plc:mock",
247 "displayName": "Mock User"
248 })))
249 .mount(mock_server)
250 .await;
251 Mock::given(method("GET"))
252 .and(path("/xrpc/app.bsky.actor.searchActors"))
253 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
254 "actors": [],
255 "cursor": null
256 })))
257 .mount(mock_server)
258 .await;
259 Mock::given(method("GET"))
260 .and(path("/xrpc/app.bsky.feed.getTimeline"))
261 .respond_with(
262 ResponseTemplate::new(200)
263 .insert_header("atproto-repo-rev", "0")
264 .set_body_json(json!({
265 "feed": [],
266 "cursor": null
267 })),
268 )
269 .mount(mock_server)
270 .await;
271 Mock::given(method("GET"))
272 .and(path("/xrpc/app.bsky.feed.getAuthorFeed"))
273 .respond_with(
274 ResponseTemplate::new(200)
275 .insert_header("atproto-repo-rev", "0")
276 .set_body_json(json!({
277 "feed": [{
278 "post": {
279 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author",
280 "cid": "bafyappview123",
281 "author": {"did": "did:plc:mock-author", "handle": "mock.author"},
282 "record": {
283 "$type": "app.bsky.feed.post",
284 "text": "Author feed post from appview",
285 "createdAt": "2025-01-01T00:00:00Z"
286 },
287 "indexedAt": "2025-01-01T00:00:00Z"
288 }
289 }],
290 "cursor": "author-cursor"
291 })),
292 )
293 .mount(mock_server)
294 .await;
295 Mock::given(method("GET"))
296 .and(path("/xrpc/app.bsky.feed.getActorLikes"))
297 .respond_with(
298 ResponseTemplate::new(200)
299 .insert_header("atproto-repo-rev", "0")
300 .set_body_json(json!({
301 "feed": [{
302 "post": {
303 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post",
304 "cid": "bafyliked123",
305 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"},
306 "record": {
307 "$type": "app.bsky.feed.post",
308 "text": "Liked post from appview",
309 "createdAt": "2025-01-01T00:00:00Z"
310 },
311 "indexedAt": "2025-01-01T00:00:00Z"
312 }
313 }],
314 "cursor": null
315 })),
316 )
317 .mount(mock_server)
318 .await;
319 Mock::given(method("GET"))
320 .and(path("/xrpc/app.bsky.feed.getPostThread"))
321 .respond_with(
322 ResponseTemplate::new(200)
323 .insert_header("atproto-repo-rev", "0")
324 .set_body_json(json!({
325 "thread": {
326 "$type": "app.bsky.feed.defs#threadViewPost",
327 "post": {
328 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post",
329 "cid": "bafythread123",
330 "author": {"did": "did:plc:mock", "handle": "mock.handle"},
331 "record": {
332 "$type": "app.bsky.feed.post",
333 "text": "Thread post from appview",
334 "createdAt": "2025-01-01T00:00:00Z"
335 },
336 "indexedAt": "2025-01-01T00:00:00Z"
337 },
338 "replies": []
339 }
340 })),
341 )
342 .mount(mock_server)
343 .await;
344 Mock::given(method("GET"))
345 .and(path("/xrpc/app.bsky.feed.getFeed"))
346 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
347 "feed": [{
348 "post": {
349 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post",
350 "cid": "bafyfeed123",
351 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"},
352 "record": {
353 "$type": "app.bsky.feed.post",
354 "text": "Custom feed post from appview",
355 "createdAt": "2025-01-01T00:00:00Z"
356 },
357 "indexedAt": "2025-01-01T00:00:00Z"
358 }
359 }],
360 "cursor": null
361 })))
362 .mount(mock_server)
363 .await;
364 Mock::given(method("POST"))
365 .and(path("/xrpc/app.bsky.notification.registerPush"))
366 .respond_with(ResponseTemplate::new(200))
367 .mount(mock_server)
368 .await;
369}
370
371async fn spawn_app(database_url: String) -> String {
372 use bspds::rate_limit::RateLimiters;
373 let pool = PgPoolOptions::new()
374 .max_connections(50)
375 .connect(&database_url)
376 .await
377 .expect("Failed to connect to Postgres. Make sure the database is running.");
378 sqlx::migrate!("./migrations")
379 .run(&pool)
380 .await
381 .expect("Failed to run migrations");
382 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
383 let addr = listener.local_addr().unwrap();
384 APP_PORT.set(addr.port()).ok();
385 unsafe {
386 std::env::set_var("PDS_HOSTNAME", addr.to_string());
387 }
388 let rate_limiters = RateLimiters::new()
389 .with_login_limit(10000)
390 .with_account_creation_limit(10000)
391 .with_password_reset_limit(10000)
392 .with_email_update_limit(10000)
393 .with_oauth_authorize_limit(10000)
394 .with_oauth_token_limit(10000);
395 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters);
396 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
397 let app = bspds::app(state);
398 tokio::spawn(async move {
399 axum::serve(listener, app).await.unwrap();
400 });
401 format!("http://{}", addr)
402}
403
404#[allow(dead_code)]
405pub async fn get_db_connection_string() -> String {
406 base_url().await;
407 if has_external_infra() {
408 std::env::var("DATABASE_URL").expect("DATABASE_URL not set")
409 } else {
410 #[cfg(not(feature = "external-infra"))]
411 {
412 let container = DB_CONTAINER.get().expect("DB container not initialized");
413 let port = container
414 .get_host_port_ipv4(5432)
415 .await
416 .expect("Failed to get port");
417 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
418 }
419 #[cfg(feature = "external-infra")]
420 {
421 panic!("DATABASE_URL must be set with external-infra feature");
422 }
423 }
424}
425
426#[allow(dead_code)]
427pub async fn verify_new_account(client: &Client, did: &str) -> String {
428 let conn_str = get_db_connection_string().await;
429 let pool = sqlx::postgres::PgPoolOptions::new()
430 .max_connections(2)
431 .connect(&conn_str)
432 .await
433 .expect("Failed to connect to test database");
434 let verification_code: String = sqlx::query_scalar!(
435 "SELECT code FROM channel_verifications WHERE user_id = (SELECT id FROM users WHERE did = $1) AND channel = 'email'",
436 did
437 )
438 .fetch_one(&pool)
439 .await
440 .expect("Failed to get verification code");
441
442 let confirm_payload = json!({
443 "did": did,
444 "verificationCode": verification_code
445 });
446 let confirm_res = client
447 .post(format!(
448 "{}/xrpc/com.atproto.server.confirmSignup",
449 base_url().await
450 ))
451 .json(&confirm_payload)
452 .send()
453 .await
454 .expect("confirmSignup request failed");
455 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed");
456 let confirm_body: Value = confirm_res
457 .json()
458 .await
459 .expect("Invalid JSON from confirmSignup");
460 confirm_body["accessJwt"]
461 .as_str()
462 .expect("No accessJwt in confirmSignup response")
463 .to_string()
464}
465
466#[allow(dead_code)]
467pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
468 let res = client
469 .post(format!(
470 "{}/xrpc/com.atproto.repo.uploadBlob",
471 base_url().await
472 ))
473 .header(header::CONTENT_TYPE, mime)
474 .bearer_auth(AUTH_TOKEN)
475 .body(data)
476 .send()
477 .await
478 .expect("Failed to send uploadBlob request");
479 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
480 let body: Value = res.json().await.expect("Blob upload response was not JSON");
481 body["blob"].clone()
482}
483
484#[allow(dead_code)]
485pub async fn create_test_post(
486 client: &Client,
487 text: &str,
488 reply_to: Option<Value>,
489) -> (String, String, String) {
490 let collection = "app.bsky.feed.post";
491 let mut record = json!({
492 "$type": collection,
493 "text": text,
494 "createdAt": Utc::now().to_rfc3339()
495 });
496 if let Some(reply_obj) = reply_to {
497 record["reply"] = reply_obj;
498 }
499 let payload = json!({
500 "repo": AUTH_DID,
501 "collection": collection,
502 "record": record
503 });
504 let res = client
505 .post(format!(
506 "{}/xrpc/com.atproto.repo.createRecord",
507 base_url().await
508 ))
509 .bearer_auth(AUTH_TOKEN)
510 .json(&payload)
511 .send()
512 .await
513 .expect("Failed to send createRecord");
514 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
515 let body: Value = res
516 .json()
517 .await
518 .expect("createRecord response was not JSON");
519 let uri = body["uri"]
520 .as_str()
521 .expect("Response had no URI")
522 .to_string();
523 let cid = body["cid"]
524 .as_str()
525 .expect("Response had no CID")
526 .to_string();
527 let rkey = uri
528 .split('/')
529 .last()
530 .expect("URI was malformed")
531 .to_string();
532 (uri, cid, rkey)
533}
534
535#[allow(dead_code)]
536pub async fn create_account_and_login(client: &Client) -> (String, String) {
537 create_account_and_login_internal(client, false).await
538}
539
540#[allow(dead_code)]
541pub async fn create_admin_account_and_login(client: &Client) -> (String, String) {
542 create_account_and_login_internal(client, true).await
543}
544
545async fn create_account_and_login_internal(client: &Client, make_admin: bool) -> (String, String) {
546 let mut last_error = String::new();
547 for attempt in 0..3 {
548 if attempt > 0 {
549 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
550 }
551 let handle = format!("user_{}", uuid::Uuid::new_v4());
552 let payload = json!({
553 "handle": handle,
554 "email": format!("{}@example.com", handle),
555 "password": "password"
556 });
557 let res = match client
558 .post(format!(
559 "{}/xrpc/com.atproto.server.createAccount",
560 base_url().await
561 ))
562 .json(&payload)
563 .send()
564 .await
565 {
566 Ok(r) => r,
567 Err(e) => {
568 last_error = format!("Request failed: {}", e);
569 continue;
570 }
571 };
572 if res.status() == StatusCode::OK {
573 let body: Value = res.json().await.expect("Invalid JSON");
574 let did = body["did"].as_str().expect("No did").to_string();
575 let conn_str = get_db_connection_string().await;
576 let pool = sqlx::postgres::PgPoolOptions::new()
577 .max_connections(2)
578 .connect(&conn_str)
579 .await
580 .expect("Failed to connect to test database");
581 if make_admin {
582 sqlx::query!("UPDATE users SET is_admin = TRUE WHERE did = $1", &did)
583 .execute(&pool)
584 .await
585 .expect("Failed to mark user as admin");
586 }
587 if let Some(access_jwt) = body["accessJwt"].as_str() {
588 return (access_jwt.to_string(), did);
589 }
590 let verification_code: String = sqlx::query_scalar!(
591 "SELECT code FROM channel_verifications WHERE user_id = (SELECT id FROM users WHERE did = $1) AND channel = 'email'",
592 &did
593 )
594 .fetch_one(&pool)
595 .await
596 .expect("Failed to get verification code");
597
598 let confirm_payload = json!({
599 "did": did,
600 "verificationCode": verification_code
601 });
602 let confirm_res = client
603 .post(format!(
604 "{}/xrpc/com.atproto.server.confirmSignup",
605 base_url().await
606 ))
607 .json(&confirm_payload)
608 .send()
609 .await
610 .expect("confirmSignup request failed");
611 if confirm_res.status() == StatusCode::OK {
612 let confirm_body: Value = confirm_res
613 .json()
614 .await
615 .expect("Invalid JSON from confirmSignup");
616 let access_jwt = confirm_body["accessJwt"]
617 .as_str()
618 .expect("No accessJwt in confirmSignup response")
619 .to_string();
620 return (access_jwt, did);
621 }
622 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await);
623 continue;
624 }
625 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
626 }
627 panic!("Failed to create account after 3 attempts: {}", last_error);
628}