this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use tokio::net::TcpListener;
15use wiremock::matchers::{method, path};
16use wiremock::{Mock, MockServer, ResponseTemplate};
17
18static SERVER_URL: OnceLock<String> = OnceLock::new();
19static APP_PORT: OnceLock<u16> = OnceLock::new();
20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
21
22#[cfg(not(feature = "external-infra"))]
23use testcontainers::core::ContainerPort;
24#[cfg(not(feature = "external-infra"))]
25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
26#[cfg(not(feature = "external-infra"))]
27use testcontainers_modules::postgres::Postgres;
28#[cfg(not(feature = "external-infra"))]
29static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
30#[cfg(not(feature = "external-infra"))]
31static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
32
33#[allow(dead_code)]
34pub const AUTH_TOKEN: &str = "test-token";
35#[allow(dead_code)]
36pub const BAD_AUTH_TOKEN: &str = "bad-token";
37#[allow(dead_code)]
38pub const AUTH_DID: &str = "did:plc:fake";
39#[allow(dead_code)]
40pub const TARGET_DID: &str = "did:plc:target";
41
42fn has_external_infra() -> bool {
43 std::env::var("BSPDS_TEST_INFRA_READY").is_ok()
44 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok())
45}
46#[cfg(test)]
47#[ctor::dtor]
48fn cleanup() {
49 if has_external_infra() {
50 return;
51 }
52 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
53 let _ = std::process::Command::new("podman")
54 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
55 .output();
56 }
57 let _ = std::process::Command::new("docker")
58 .args(&[
59 "container",
60 "prune",
61 "-f",
62 "--filter",
63 "label=bspds_test=true",
64 ])
65 .output();
66}
67
68#[allow(dead_code)]
69pub fn client() -> Client {
70 Client::new()
71}
72
73#[allow(dead_code)]
74pub fn app_port() -> u16 {
75 *APP_PORT.get().expect("APP_PORT not initialized")
76}
77
78pub async fn base_url() -> &'static str {
79 SERVER_URL.get_or_init(|| {
80 let (tx, rx) = std::sync::mpsc::channel();
81 std::thread::spawn(move || {
82 unsafe {
83 std::env::set_var("BSPDS_ALLOW_INSECURE_SECRETS", "1");
84 }
85 if std::env::var("DOCKER_HOST").is_err() {
86 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
87 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
88 if podman_sock.exists() {
89 unsafe {
90 std::env::set_var(
91 "DOCKER_HOST",
92 format!("unix://{}", podman_sock.display()),
93 );
94 }
95 }
96 }
97 }
98 let rt = tokio::runtime::Runtime::new().unwrap();
99 rt.block_on(async move {
100 if has_external_infra() {
101 let url = setup_with_external_infra().await;
102 tx.send(url).unwrap();
103 } else {
104 let url = setup_with_testcontainers().await;
105 tx.send(url).unwrap();
106 }
107 std::future::pending::<()>().await;
108 });
109 });
110 rx.recv().expect("Failed to start test server")
111 })
112}
113
114async fn setup_with_external_infra() -> String {
115 let database_url =
116 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra");
117 let s3_endpoint =
118 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra");
119 unsafe {
120 std::env::set_var(
121 "S3_BUCKET",
122 std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()),
123 );
124 std::env::set_var(
125 "AWS_ACCESS_KEY_ID",
126 std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()),
127 );
128 std::env::set_var(
129 "AWS_SECRET_ACCESS_KEY",
130 std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()),
131 );
132 std::env::set_var(
133 "AWS_REGION",
134 std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()),
135 );
136 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
137 }
138 let mock_server = MockServer::start().await;
139 setup_mock_appview(&mock_server).await;
140 unsafe {
141 std::env::set_var("APPVIEW_URL", mock_server.uri());
142 }
143 MOCK_APPVIEW.set(mock_server).ok();
144 spawn_app(database_url).await
145}
146
147#[cfg(not(feature = "external-infra"))]
148async fn setup_with_testcontainers() -> String {
149 let s3_container = GenericImage::new("minio/minio", "latest")
150 .with_exposed_port(ContainerPort::Tcp(9000))
151 .with_env_var("MINIO_ROOT_USER", "minioadmin")
152 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
153 .with_cmd(vec!["server".to_string(), "/data".to_string()])
154 .with_label("bspds_test", "true")
155 .start()
156 .await
157 .expect("Failed to start MinIO");
158 let s3_port = s3_container
159 .get_host_port_ipv4(9000)
160 .await
161 .expect("Failed to get S3 port");
162 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
163 unsafe {
164 std::env::set_var("S3_BUCKET", "test-bucket");
165 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
166 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
167 std::env::set_var("AWS_REGION", "us-east-1");
168 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
169 }
170 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
171 .region("us-east-1")
172 .endpoint_url(&s3_endpoint)
173 .credentials_provider(Credentials::new(
174 "minioadmin",
175 "minioadmin",
176 None,
177 None,
178 "test",
179 ))
180 .load()
181 .await;
182 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
183 .force_path_style(true)
184 .build();
185 let s3_client = S3Client::from_conf(s3_config);
186 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
187 let mock_server = MockServer::start().await;
188 setup_mock_appview(&mock_server).await;
189 unsafe {
190 std::env::set_var("APPVIEW_URL", mock_server.uri());
191 }
192 MOCK_APPVIEW.set(mock_server).ok();
193 S3_CONTAINER.set(s3_container).ok();
194 let container = Postgres::default()
195 .with_tag("18-alpine")
196 .with_label("bspds_test", "true")
197 .start()
198 .await
199 .expect("Failed to start Postgres");
200 let connection_string = format!(
201 "postgres://postgres:postgres@127.0.0.1:{}",
202 container
203 .get_host_port_ipv4(5432)
204 .await
205 .expect("Failed to get port")
206 );
207 DB_CONTAINER.set(container).ok();
208 spawn_app(connection_string).await
209}
210
211#[cfg(feature = "external-infra")]
212async fn setup_with_testcontainers() -> String {
213 panic!(
214 "Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT."
215 );
216}
217
218async fn setup_mock_appview(mock_server: &MockServer) {
219 Mock::given(method("GET"))
220 .and(path("/xrpc/app.bsky.actor.getProfile"))
221 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
222 "handle": "mock.handle",
223 "did": "did:plc:mock",
224 "displayName": "Mock User"
225 })))
226 .mount(mock_server)
227 .await;
228 Mock::given(method("GET"))
229 .and(path("/xrpc/app.bsky.actor.searchActors"))
230 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
231 "actors": [],
232 "cursor": null
233 })))
234 .mount(mock_server)
235 .await;
236 Mock::given(method("GET"))
237 .and(path("/xrpc/app.bsky.feed.getTimeline"))
238 .respond_with(
239 ResponseTemplate::new(200)
240 .insert_header("atproto-repo-rev", "0")
241 .set_body_json(json!({
242 "feed": [],
243 "cursor": null
244 })),
245 )
246 .mount(mock_server)
247 .await;
248 Mock::given(method("GET"))
249 .and(path("/xrpc/app.bsky.feed.getAuthorFeed"))
250 .respond_with(
251 ResponseTemplate::new(200)
252 .insert_header("atproto-repo-rev", "0")
253 .set_body_json(json!({
254 "feed": [{
255 "post": {
256 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author",
257 "cid": "bafyappview123",
258 "author": {"did": "did:plc:mock-author", "handle": "mock.author"},
259 "record": {
260 "$type": "app.bsky.feed.post",
261 "text": "Author feed post from appview",
262 "createdAt": "2025-01-01T00:00:00Z"
263 },
264 "indexedAt": "2025-01-01T00:00:00Z"
265 }
266 }],
267 "cursor": "author-cursor"
268 })),
269 )
270 .mount(mock_server)
271 .await;
272 Mock::given(method("GET"))
273 .and(path("/xrpc/app.bsky.feed.getActorLikes"))
274 .respond_with(
275 ResponseTemplate::new(200)
276 .insert_header("atproto-repo-rev", "0")
277 .set_body_json(json!({
278 "feed": [{
279 "post": {
280 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post",
281 "cid": "bafyliked123",
282 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"},
283 "record": {
284 "$type": "app.bsky.feed.post",
285 "text": "Liked post from appview",
286 "createdAt": "2025-01-01T00:00:00Z"
287 },
288 "indexedAt": "2025-01-01T00:00:00Z"
289 }
290 }],
291 "cursor": null
292 })),
293 )
294 .mount(mock_server)
295 .await;
296 Mock::given(method("GET"))
297 .and(path("/xrpc/app.bsky.feed.getPostThread"))
298 .respond_with(
299 ResponseTemplate::new(200)
300 .insert_header("atproto-repo-rev", "0")
301 .set_body_json(json!({
302 "thread": {
303 "$type": "app.bsky.feed.defs#threadViewPost",
304 "post": {
305 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post",
306 "cid": "bafythread123",
307 "author": {"did": "did:plc:mock", "handle": "mock.handle"},
308 "record": {
309 "$type": "app.bsky.feed.post",
310 "text": "Thread post from appview",
311 "createdAt": "2025-01-01T00:00:00Z"
312 },
313 "indexedAt": "2025-01-01T00:00:00Z"
314 },
315 "replies": []
316 }
317 })),
318 )
319 .mount(mock_server)
320 .await;
321 Mock::given(method("GET"))
322 .and(path("/xrpc/app.bsky.feed.getFeed"))
323 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
324 "feed": [{
325 "post": {
326 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post",
327 "cid": "bafyfeed123",
328 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"},
329 "record": {
330 "$type": "app.bsky.feed.post",
331 "text": "Custom feed post from appview",
332 "createdAt": "2025-01-01T00:00:00Z"
333 },
334 "indexedAt": "2025-01-01T00:00:00Z"
335 }
336 }],
337 "cursor": null
338 })))
339 .mount(mock_server)
340 .await;
341 Mock::given(method("POST"))
342 .and(path("/xrpc/app.bsky.notification.registerPush"))
343 .respond_with(ResponseTemplate::new(200))
344 .mount(mock_server)
345 .await;
346}
347
348async fn spawn_app(database_url: String) -> String {
349 use bspds::rate_limit::RateLimiters;
350 let pool = PgPoolOptions::new()
351 .max_connections(50)
352 .connect(&database_url)
353 .await
354 .expect("Failed to connect to Postgres. Make sure the database is running.");
355 sqlx::migrate!("./migrations")
356 .run(&pool)
357 .await
358 .expect("Failed to run migrations");
359 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
360 let addr = listener.local_addr().unwrap();
361 APP_PORT.set(addr.port()).ok();
362 unsafe {
363 std::env::set_var("PDS_HOSTNAME", addr.to_string());
364 }
365 let rate_limiters = RateLimiters::new()
366 .with_login_limit(10000)
367 .with_account_creation_limit(10000)
368 .with_password_reset_limit(10000)
369 .with_email_update_limit(10000)
370 .with_oauth_authorize_limit(10000)
371 .with_oauth_token_limit(10000);
372 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters);
373 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
374 let app = bspds::app(state);
375 tokio::spawn(async move {
376 axum::serve(listener, app).await.unwrap();
377 });
378 format!("http://{}", addr)
379}
380
381#[allow(dead_code)]
382pub async fn get_db_connection_string() -> String {
383 base_url().await;
384 if has_external_infra() {
385 std::env::var("DATABASE_URL").expect("DATABASE_URL not set")
386 } else {
387 #[cfg(not(feature = "external-infra"))]
388 {
389 let container = DB_CONTAINER.get().expect("DB container not initialized");
390 let port = container
391 .get_host_port_ipv4(5432)
392 .await
393 .expect("Failed to get port");
394 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
395 }
396 #[cfg(feature = "external-infra")]
397 {
398 panic!("DATABASE_URL must be set with external-infra feature");
399 }
400 }
401}
402
403#[allow(dead_code)]
404pub async fn verify_new_account(client: &Client, did: &str) -> String {
405 let conn_str = get_db_connection_string().await;
406 let pool = sqlx::postgres::PgPoolOptions::new()
407 .max_connections(2)
408 .connect(&conn_str)
409 .await
410 .expect("Failed to connect to test database");
411 let verification_code: String = sqlx::query_scalar!(
412 "SELECT code FROM channel_verifications WHERE user_id = (SELECT id FROM users WHERE did = $1) AND channel = 'email'",
413 did
414 )
415 .fetch_one(&pool)
416 .await
417 .expect("Failed to get verification code");
418
419 let confirm_payload = json!({
420 "did": did,
421 "verificationCode": verification_code
422 });
423 let confirm_res = client
424 .post(format!(
425 "{}/xrpc/com.atproto.server.confirmSignup",
426 base_url().await
427 ))
428 .json(&confirm_payload)
429 .send()
430 .await
431 .expect("confirmSignup request failed");
432 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed");
433 let confirm_body: Value = confirm_res
434 .json()
435 .await
436 .expect("Invalid JSON from confirmSignup");
437 confirm_body["accessJwt"]
438 .as_str()
439 .expect("No accessJwt in confirmSignup response")
440 .to_string()
441}
442
443#[allow(dead_code)]
444pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
445 let res = client
446 .post(format!(
447 "{}/xrpc/com.atproto.repo.uploadBlob",
448 base_url().await
449 ))
450 .header(header::CONTENT_TYPE, mime)
451 .bearer_auth(AUTH_TOKEN)
452 .body(data)
453 .send()
454 .await
455 .expect("Failed to send uploadBlob request");
456 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
457 let body: Value = res.json().await.expect("Blob upload response was not JSON");
458 body["blob"].clone()
459}
460
461#[allow(dead_code)]
462pub async fn create_test_post(
463 client: &Client,
464 text: &str,
465 reply_to: Option<Value>,
466) -> (String, String, String) {
467 let collection = "app.bsky.feed.post";
468 let mut record = json!({
469 "$type": collection,
470 "text": text,
471 "createdAt": Utc::now().to_rfc3339()
472 });
473 if let Some(reply_obj) = reply_to {
474 record["reply"] = reply_obj;
475 }
476 let payload = json!({
477 "repo": AUTH_DID,
478 "collection": collection,
479 "record": record
480 });
481 let res = client
482 .post(format!(
483 "{}/xrpc/com.atproto.repo.createRecord",
484 base_url().await
485 ))
486 .bearer_auth(AUTH_TOKEN)
487 .json(&payload)
488 .send()
489 .await
490 .expect("Failed to send createRecord");
491 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
492 let body: Value = res
493 .json()
494 .await
495 .expect("createRecord response was not JSON");
496 let uri = body["uri"]
497 .as_str()
498 .expect("Response had no URI")
499 .to_string();
500 let cid = body["cid"]
501 .as_str()
502 .expect("Response had no CID")
503 .to_string();
504 let rkey = uri
505 .split('/')
506 .last()
507 .expect("URI was malformed")
508 .to_string();
509 (uri, cid, rkey)
510}
511
512#[allow(dead_code)]
513pub async fn create_account_and_login(client: &Client) -> (String, String) {
514 let mut last_error = String::new();
515 for attempt in 0..3 {
516 if attempt > 0 {
517 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
518 }
519 let handle = format!("user_{}", uuid::Uuid::new_v4());
520 let payload = json!({
521 "handle": handle,
522 "email": format!("{}@example.com", handle),
523 "password": "password"
524 });
525 let res = match client
526 .post(format!(
527 "{}/xrpc/com.atproto.server.createAccount",
528 base_url().await
529 ))
530 .json(&payload)
531 .send()
532 .await
533 {
534 Ok(r) => r,
535 Err(e) => {
536 last_error = format!("Request failed: {}", e);
537 continue;
538 }
539 };
540 if res.status() == StatusCode::OK {
541 let body: Value = res.json().await.expect("Invalid JSON");
542 if let Some(access_jwt) = body["accessJwt"].as_str() {
543 let did = body["did"].as_str().expect("No did").to_string();
544 return (access_jwt.to_string(), did);
545 }
546 let did = body["did"].as_str().expect("No did").to_string();
547 let conn_str = get_db_connection_string().await;
548 let pool = sqlx::postgres::PgPoolOptions::new()
549 .max_connections(2)
550 .connect(&conn_str)
551 .await
552 .expect("Failed to connect to test database");
553 let verification_code: String = sqlx::query_scalar!(
554 "SELECT code FROM channel_verifications WHERE user_id = (SELECT id FROM users WHERE did = $1) AND channel = 'email'",
555 &did
556 )
557 .fetch_one(&pool)
558 .await
559 .expect("Failed to get verification code");
560
561 let confirm_payload = json!({
562 "did": did,
563 "verificationCode": verification_code
564 });
565 let confirm_res = client
566 .post(format!(
567 "{}/xrpc/com.atproto.server.confirmSignup",
568 base_url().await
569 ))
570 .json(&confirm_payload)
571 .send()
572 .await
573 .expect("confirmSignup request failed");
574 if confirm_res.status() == StatusCode::OK {
575 let confirm_body: Value = confirm_res
576 .json()
577 .await
578 .expect("Invalid JSON from confirmSignup");
579 let access_jwt = confirm_body["accessJwt"]
580 .as_str()
581 .expect("No accessJwt in confirmSignup response")
582 .to_string();
583 return (access_jwt, did);
584 }
585 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await);
586 continue;
587 }
588 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
589 }
590 panic!("Failed to create account after 3 attempts: {}", last_error);
591}