this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use tokio::net::TcpListener;
15use wiremock::matchers::{method, path};
16use wiremock::{Mock, MockServer, ResponseTemplate};
17static SERVER_URL: OnceLock<String> = OnceLock::new();
18static APP_PORT: OnceLock<u16> = OnceLock::new();
19static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
20#[cfg(not(feature = "external-infra"))]
21use testcontainers::core::ContainerPort;
22#[cfg(not(feature = "external-infra"))]
23use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
24#[cfg(not(feature = "external-infra"))]
25use testcontainers_modules::postgres::Postgres;
26#[cfg(not(feature = "external-infra"))]
27static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
28#[cfg(not(feature = "external-infra"))]
29static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
30#[allow(dead_code)]
31pub const AUTH_TOKEN: &str = "test-token";
32#[allow(dead_code)]
33pub const BAD_AUTH_TOKEN: &str = "bad-token";
34#[allow(dead_code)]
35pub const AUTH_DID: &str = "did:plc:fake";
36#[allow(dead_code)]
37pub const TARGET_DID: &str = "did:plc:target";
38fn has_external_infra() -> bool {
39 std::env::var("BSPDS_TEST_INFRA_READY").is_ok()
40 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok())
41}
42#[cfg(test)]
43#[ctor::dtor]
44fn cleanup() {
45 if has_external_infra() {
46 return;
47 }
48 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
49 let _ = std::process::Command::new("podman")
50 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
51 .output();
52 }
53 let _ = std::process::Command::new("docker")
54 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"])
55 .output();
56}
57#[allow(dead_code)]
58pub fn client() -> Client {
59 Client::new()
60}
61#[allow(dead_code)]
62pub fn app_port() -> u16 {
63 *APP_PORT.get().expect("APP_PORT not initialized")
64}
65pub async fn base_url() -> &'static str {
66 SERVER_URL.get_or_init(|| {
67 let (tx, rx) = std::sync::mpsc::channel();
68 std::thread::spawn(move || {
69 if std::env::var("DOCKER_HOST").is_err() {
70 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
71 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
72 if podman_sock.exists() {
73 unsafe {
74 std::env::set_var(
75 "DOCKER_HOST",
76 format!("unix://{}", podman_sock.display()),
77 );
78 }
79 }
80 }
81 }
82 let rt = tokio::runtime::Runtime::new().unwrap();
83 rt.block_on(async move {
84 if has_external_infra() {
85 let url = setup_with_external_infra().await;
86 tx.send(url).unwrap();
87 } else {
88 let url = setup_with_testcontainers().await;
89 tx.send(url).unwrap();
90 }
91 std::future::pending::<()>().await;
92 });
93 });
94 rx.recv().expect("Failed to start test server")
95 })
96}
97async fn setup_with_external_infra() -> String {
98 let database_url = std::env::var("DATABASE_URL")
99 .expect("DATABASE_URL must be set when using external infra");
100 let s3_endpoint = std::env::var("S3_ENDPOINT")
101 .expect("S3_ENDPOINT must be set when using external infra");
102 unsafe {
103 std::env::set_var("S3_BUCKET", std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()));
104 std::env::set_var("AWS_ACCESS_KEY_ID", std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()));
105 std::env::set_var("AWS_SECRET_ACCESS_KEY", std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()));
106 std::env::set_var("AWS_REGION", std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()));
107 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
108 }
109 let mock_server = MockServer::start().await;
110 setup_mock_appview(&mock_server).await;
111 unsafe {
112 std::env::set_var("APPVIEW_URL", mock_server.uri());
113 }
114 MOCK_APPVIEW.set(mock_server).ok();
115 spawn_app(database_url).await
116}
117#[cfg(not(feature = "external-infra"))]
118async fn setup_with_testcontainers() -> String {
119 let s3_container = GenericImage::new("minio/minio", "latest")
120 .with_exposed_port(ContainerPort::Tcp(9000))
121 .with_env_var("MINIO_ROOT_USER", "minioadmin")
122 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
123 .with_cmd(vec!["server".to_string(), "/data".to_string()])
124 .with_label("bspds_test", "true")
125 .start()
126 .await
127 .expect("Failed to start MinIO");
128 let s3_port = s3_container
129 .get_host_port_ipv4(9000)
130 .await
131 .expect("Failed to get S3 port");
132 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
133 unsafe {
134 std::env::set_var("S3_BUCKET", "test-bucket");
135 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
136 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
137 std::env::set_var("AWS_REGION", "us-east-1");
138 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
139 }
140 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
141 .region("us-east-1")
142 .endpoint_url(&s3_endpoint)
143 .credentials_provider(Credentials::new(
144 "minioadmin",
145 "minioadmin",
146 None,
147 None,
148 "test",
149 ))
150 .load()
151 .await;
152 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
153 .force_path_style(true)
154 .build();
155 let s3_client = S3Client::from_conf(s3_config);
156 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
157 let mock_server = MockServer::start().await;
158 setup_mock_appview(&mock_server).await;
159 unsafe {
160 std::env::set_var("APPVIEW_URL", mock_server.uri());
161 }
162 MOCK_APPVIEW.set(mock_server).ok();
163 S3_CONTAINER.set(s3_container).ok();
164 let container = Postgres::default()
165 .with_tag("18-alpine")
166 .with_label("bspds_test", "true")
167 .start()
168 .await
169 .expect("Failed to start Postgres");
170 let connection_string = format!(
171 "postgres://postgres:postgres@127.0.0.1:{}",
172 container
173 .get_host_port_ipv4(5432)
174 .await
175 .expect("Failed to get port")
176 );
177 DB_CONTAINER.set(container).ok();
178 spawn_app(connection_string).await
179}
180#[cfg(feature = "external-infra")]
181async fn setup_with_testcontainers() -> String {
182 panic!("Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT.");
183}
184async fn setup_mock_appview(mock_server: &MockServer) {
185 Mock::given(method("GET"))
186 .and(path("/xrpc/app.bsky.actor.getProfile"))
187 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
188 "handle": "mock.handle",
189 "did": "did:plc:mock",
190 "displayName": "Mock User"
191 })))
192 .mount(mock_server)
193 .await;
194 Mock::given(method("GET"))
195 .and(path("/xrpc/app.bsky.actor.searchActors"))
196 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
197 "actors": [],
198 "cursor": null
199 })))
200 .mount(mock_server)
201 .await;
202 Mock::given(method("GET"))
203 .and(path("/xrpc/app.bsky.feed.getTimeline"))
204 .respond_with(
205 ResponseTemplate::new(200)
206 .insert_header("atproto-repo-rev", "0")
207 .set_body_json(json!({
208 "feed": [],
209 "cursor": null
210 }))
211 )
212 .mount(mock_server)
213 .await;
214 Mock::given(method("GET"))
215 .and(path("/xrpc/app.bsky.feed.getAuthorFeed"))
216 .respond_with(
217 ResponseTemplate::new(200)
218 .insert_header("atproto-repo-rev", "0")
219 .set_body_json(json!({
220 "feed": [{
221 "post": {
222 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author",
223 "cid": "bafyappview123",
224 "author": {"did": "did:plc:mock-author", "handle": "mock.author"},
225 "record": {
226 "$type": "app.bsky.feed.post",
227 "text": "Author feed post from appview",
228 "createdAt": "2025-01-01T00:00:00Z"
229 },
230 "indexedAt": "2025-01-01T00:00:00Z"
231 }
232 }],
233 "cursor": "author-cursor"
234 })),
235 )
236 .mount(mock_server)
237 .await;
238 Mock::given(method("GET"))
239 .and(path("/xrpc/app.bsky.feed.getActorLikes"))
240 .respond_with(
241 ResponseTemplate::new(200)
242 .insert_header("atproto-repo-rev", "0")
243 .set_body_json(json!({
244 "feed": [{
245 "post": {
246 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post",
247 "cid": "bafyliked123",
248 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"},
249 "record": {
250 "$type": "app.bsky.feed.post",
251 "text": "Liked post from appview",
252 "createdAt": "2025-01-01T00:00:00Z"
253 },
254 "indexedAt": "2025-01-01T00:00:00Z"
255 }
256 }],
257 "cursor": null
258 })),
259 )
260 .mount(mock_server)
261 .await;
262 Mock::given(method("GET"))
263 .and(path("/xrpc/app.bsky.feed.getPostThread"))
264 .respond_with(
265 ResponseTemplate::new(200)
266 .insert_header("atproto-repo-rev", "0")
267 .set_body_json(json!({
268 "thread": {
269 "$type": "app.bsky.feed.defs#threadViewPost",
270 "post": {
271 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post",
272 "cid": "bafythread123",
273 "author": {"did": "did:plc:mock", "handle": "mock.handle"},
274 "record": {
275 "$type": "app.bsky.feed.post",
276 "text": "Thread post from appview",
277 "createdAt": "2025-01-01T00:00:00Z"
278 },
279 "indexedAt": "2025-01-01T00:00:00Z"
280 },
281 "replies": []
282 }
283 })),
284 )
285 .mount(mock_server)
286 .await;
287 Mock::given(method("GET"))
288 .and(path("/xrpc/app.bsky.feed.getFeed"))
289 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
290 "feed": [{
291 "post": {
292 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post",
293 "cid": "bafyfeed123",
294 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"},
295 "record": {
296 "$type": "app.bsky.feed.post",
297 "text": "Custom feed post from appview",
298 "createdAt": "2025-01-01T00:00:00Z"
299 },
300 "indexedAt": "2025-01-01T00:00:00Z"
301 }
302 }],
303 "cursor": null
304 })))
305 .mount(mock_server)
306 .await;
307 Mock::given(method("POST"))
308 .and(path("/xrpc/app.bsky.notification.registerPush"))
309 .respond_with(ResponseTemplate::new(200))
310 .mount(mock_server)
311 .await;
312}
313async fn spawn_app(database_url: String) -> String {
314 use bspds::rate_limit::RateLimiters;
315 let pool = PgPoolOptions::new()
316 .max_connections(50)
317 .connect(&database_url)
318 .await
319 .expect("Failed to connect to Postgres. Make sure the database is running.");
320 sqlx::migrate!("./migrations")
321 .run(&pool)
322 .await
323 .expect("Failed to run migrations");
324 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
325 let addr = listener.local_addr().unwrap();
326 APP_PORT.set(addr.port()).ok();
327 unsafe {
328 std::env::set_var("PDS_HOSTNAME", addr.to_string());
329 }
330 let rate_limiters = RateLimiters::new()
331 .with_login_limit(10000)
332 .with_account_creation_limit(10000)
333 .with_password_reset_limit(10000)
334 .with_email_update_limit(10000)
335 .with_oauth_authorize_limit(10000)
336 .with_oauth_token_limit(10000);
337 let state = AppState::new(pool).await.with_rate_limiters(rate_limiters);
338 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
339 let app = bspds::app(state);
340 tokio::spawn(async move {
341 axum::serve(listener, app).await.unwrap();
342 });
343 format!("http://{}", addr)
344}
345#[allow(dead_code)]
346pub async fn get_db_connection_string() -> String {
347 base_url().await;
348 if has_external_infra() {
349 std::env::var("DATABASE_URL").expect("DATABASE_URL not set")
350 } else {
351 #[cfg(not(feature = "external-infra"))]
352 {
353 let container = DB_CONTAINER.get().expect("DB container not initialized");
354 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port");
355 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
356 }
357 #[cfg(feature = "external-infra")]
358 {
359 panic!("DATABASE_URL must be set with external-infra feature");
360 }
361 }
362}
363#[allow(dead_code)]
364pub async fn verify_new_account(client: &Client, did: &str) -> String {
365 let conn_str = get_db_connection_string().await;
366 let pool = sqlx::postgres::PgPoolOptions::new()
367 .max_connections(2)
368 .connect(&conn_str)
369 .await
370 .expect("Failed to connect to test database");
371 let verification_code: String = sqlx::query_scalar!(
372 "SELECT email_confirmation_code FROM users WHERE did = $1",
373 did
374 )
375 .fetch_one(&pool)
376 .await
377 .expect("Failed to get verification code")
378 .expect("No verification code found");
379 let confirm_payload = json!({
380 "did": did,
381 "verificationCode": verification_code
382 });
383 let confirm_res = client
384 .post(format!(
385 "{}/xrpc/com.atproto.server.confirmSignup",
386 base_url().await
387 ))
388 .json(&confirm_payload)
389 .send()
390 .await
391 .expect("confirmSignup request failed");
392 assert_eq!(confirm_res.status(), StatusCode::OK, "confirmSignup failed");
393 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup");
394 confirm_body["accessJwt"]
395 .as_str()
396 .expect("No accessJwt in confirmSignup response")
397 .to_string()
398}
399#[allow(dead_code)]
400pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
401 let res = client
402 .post(format!(
403 "{}/xrpc/com.atproto.repo.uploadBlob",
404 base_url().await
405 ))
406 .header(header::CONTENT_TYPE, mime)
407 .bearer_auth(AUTH_TOKEN)
408 .body(data)
409 .send()
410 .await
411 .expect("Failed to send uploadBlob request");
412 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
413 let body: Value = res.json().await.expect("Blob upload response was not JSON");
414 body["blob"].clone()
415}
416#[allow(dead_code)]
417pub async fn create_test_post(
418 client: &Client,
419 text: &str,
420 reply_to: Option<Value>,
421) -> (String, String, String) {
422 let collection = "app.bsky.feed.post";
423 let mut record = json!({
424 "$type": collection,
425 "text": text,
426 "createdAt": Utc::now().to_rfc3339()
427 });
428 if let Some(reply_obj) = reply_to {
429 record["reply"] = reply_obj;
430 }
431 let payload = json!({
432 "repo": AUTH_DID,
433 "collection": collection,
434 "record": record
435 });
436 let res = client
437 .post(format!(
438 "{}/xrpc/com.atproto.repo.createRecord",
439 base_url().await
440 ))
441 .bearer_auth(AUTH_TOKEN)
442 .json(&payload)
443 .send()
444 .await
445 .expect("Failed to send createRecord");
446 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
447 let body: Value = res
448 .json()
449 .await
450 .expect("createRecord response was not JSON");
451 let uri = body["uri"]
452 .as_str()
453 .expect("Response had no URI")
454 .to_string();
455 let cid = body["cid"]
456 .as_str()
457 .expect("Response had no CID")
458 .to_string();
459 let rkey = uri
460 .split('/')
461 .last()
462 .expect("URI was malformed")
463 .to_string();
464 (uri, cid, rkey)
465}
466#[allow(dead_code)]
467pub async fn create_account_and_login(client: &Client) -> (String, String) {
468 let mut last_error = String::new();
469 for attempt in 0..3 {
470 if attempt > 0 {
471 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
472 }
473 let handle = format!("user_{}", uuid::Uuid::new_v4());
474 let payload = json!({
475 "handle": handle,
476 "email": format!("{}@example.com", handle),
477 "password": "password"
478 });
479 let res = match client
480 .post(format!(
481 "{}/xrpc/com.atproto.server.createAccount",
482 base_url().await
483 ))
484 .json(&payload)
485 .send()
486 .await
487 {
488 Ok(r) => r,
489 Err(e) => {
490 last_error = format!("Request failed: {}", e);
491 continue;
492 }
493 };
494 if res.status() == StatusCode::OK {
495 let body: Value = res.json().await.expect("Invalid JSON");
496 if let Some(access_jwt) = body["accessJwt"].as_str() {
497 let did = body["did"].as_str().expect("No did").to_string();
498 return (access_jwt.to_string(), did);
499 }
500 let did = body["did"].as_str().expect("No did").to_string();
501 let conn_str = get_db_connection_string().await;
502 let pool = sqlx::postgres::PgPoolOptions::new()
503 .max_connections(2)
504 .connect(&conn_str)
505 .await
506 .expect("Failed to connect to test database");
507 let verification_code: String = sqlx::query_scalar!(
508 "SELECT email_confirmation_code FROM users WHERE did = $1",
509 &did
510 )
511 .fetch_one(&pool)
512 .await
513 .expect("Failed to get verification code")
514 .expect("No verification code found");
515 let confirm_payload = json!({
516 "did": did,
517 "verificationCode": verification_code
518 });
519 let confirm_res = client
520 .post(format!(
521 "{}/xrpc/com.atproto.server.confirmSignup",
522 base_url().await
523 ))
524 .json(&confirm_payload)
525 .send()
526 .await
527 .expect("confirmSignup request failed");
528 if confirm_res.status() == StatusCode::OK {
529 let confirm_body: Value = confirm_res.json().await.expect("Invalid JSON from confirmSignup");
530 let access_jwt = confirm_body["accessJwt"]
531 .as_str()
532 .expect("No accessJwt in confirmSignup response")
533 .to_string();
534 return (access_jwt, did);
535 }
536 last_error = format!("confirmSignup failed: {:?}", confirm_res.text().await);
537 continue;
538 }
539 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
540 }
541 panic!("Failed to create account after 3 attempts: {}", last_error);
542}