this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use tokio::net::TcpListener;
15use wiremock::matchers::{method, path};
16use wiremock::{Mock, MockServer, ResponseTemplate};
17
18static SERVER_URL: OnceLock<String> = OnceLock::new();
19static APP_PORT: OnceLock<u16> = OnceLock::new();
20static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
21
22#[cfg(not(feature = "external-infra"))]
23use testcontainers::core::ContainerPort;
24#[cfg(not(feature = "external-infra"))]
25use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
26#[cfg(not(feature = "external-infra"))]
27use testcontainers_modules::postgres::Postgres;
28
29#[cfg(not(feature = "external-infra"))]
30static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
31#[cfg(not(feature = "external-infra"))]
32static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
33
34#[allow(dead_code)]
35pub const AUTH_TOKEN: &str = "test-token";
36#[allow(dead_code)]
37pub const BAD_AUTH_TOKEN: &str = "bad-token";
38#[allow(dead_code)]
39pub const AUTH_DID: &str = "did:plc:fake";
40#[allow(dead_code)]
41pub const TARGET_DID: &str = "did:plc:target";
42
43fn has_external_infra() -> bool {
44 std::env::var("BSPDS_TEST_INFRA_READY").is_ok()
45 || (std::env::var("DATABASE_URL").is_ok() && std::env::var("S3_ENDPOINT").is_ok())
46}
47
48#[cfg(test)]
49#[ctor::dtor]
50fn cleanup() {
51 if has_external_infra() {
52 return;
53 }
54
55 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
56 let _ = std::process::Command::new("podman")
57 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
58 .output();
59 }
60
61 let _ = std::process::Command::new("docker")
62 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"])
63 .output();
64}
65
66#[allow(dead_code)]
67pub fn client() -> Client {
68 Client::new()
69}
70
71#[allow(dead_code)]
72pub fn app_port() -> u16 {
73 *APP_PORT.get().expect("APP_PORT not initialized")
74}
75
76pub async fn base_url() -> &'static str {
77 SERVER_URL.get_or_init(|| {
78 let (tx, rx) = std::sync::mpsc::channel();
79
80 std::thread::spawn(move || {
81 if std::env::var("DOCKER_HOST").is_err() {
82 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
83 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
84 if podman_sock.exists() {
85 unsafe {
86 std::env::set_var(
87 "DOCKER_HOST",
88 format!("unix://{}", podman_sock.display()),
89 );
90 }
91 }
92 }
93 }
94
95 let rt = tokio::runtime::Runtime::new().unwrap();
96 rt.block_on(async move {
97 if has_external_infra() {
98 let url = setup_with_external_infra().await;
99 tx.send(url).unwrap();
100 } else {
101 let url = setup_with_testcontainers().await;
102 tx.send(url).unwrap();
103 }
104 std::future::pending::<()>().await;
105 });
106 });
107
108 rx.recv().expect("Failed to start test server")
109 })
110}
111
112async fn setup_with_external_infra() -> String {
113 let database_url = std::env::var("DATABASE_URL")
114 .expect("DATABASE_URL must be set when using external infra");
115 let s3_endpoint = std::env::var("S3_ENDPOINT")
116 .expect("S3_ENDPOINT must be set when using external infra");
117
118 unsafe {
119 std::env::set_var("S3_BUCKET", std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()));
120 std::env::set_var("AWS_ACCESS_KEY_ID", std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()));
121 std::env::set_var("AWS_SECRET_ACCESS_KEY", std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()));
122 std::env::set_var("AWS_REGION", std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()));
123 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
124 }
125
126 let mock_server = MockServer::start().await;
127 setup_mock_appview(&mock_server).await;
128
129 unsafe {
130 std::env::set_var("APPVIEW_URL", mock_server.uri());
131 }
132 MOCK_APPVIEW.set(mock_server).ok();
133
134 spawn_app(database_url).await
135}
136
137#[cfg(not(feature = "external-infra"))]
138async fn setup_with_testcontainers() -> String {
139 let s3_container = GenericImage::new("minio/minio", "latest")
140 .with_exposed_port(ContainerPort::Tcp(9000))
141 .with_env_var("MINIO_ROOT_USER", "minioadmin")
142 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
143 .with_cmd(vec!["server".to_string(), "/data".to_string()])
144 .with_label("bspds_test", "true")
145 .start()
146 .await
147 .expect("Failed to start MinIO");
148
149 let s3_port = s3_container
150 .get_host_port_ipv4(9000)
151 .await
152 .expect("Failed to get S3 port");
153 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
154
155 unsafe {
156 std::env::set_var("S3_BUCKET", "test-bucket");
157 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
158 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
159 std::env::set_var("AWS_REGION", "us-east-1");
160 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
161 }
162
163 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
164 .region("us-east-1")
165 .endpoint_url(&s3_endpoint)
166 .credentials_provider(Credentials::new(
167 "minioadmin",
168 "minioadmin",
169 None,
170 None,
171 "test",
172 ))
173 .load()
174 .await;
175
176 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
177 .force_path_style(true)
178 .build();
179 let s3_client = S3Client::from_conf(s3_config);
180
181 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
182
183 let mock_server = MockServer::start().await;
184 setup_mock_appview(&mock_server).await;
185
186 unsafe {
187 std::env::set_var("APPVIEW_URL", mock_server.uri());
188 }
189 MOCK_APPVIEW.set(mock_server).ok();
190
191 S3_CONTAINER.set(s3_container).ok();
192
193 let container = Postgres::default()
194 .with_tag("18-alpine")
195 .with_label("bspds_test", "true")
196 .start()
197 .await
198 .expect("Failed to start Postgres");
199 let connection_string = format!(
200 "postgres://postgres:postgres@127.0.0.1:{}",
201 container
202 .get_host_port_ipv4(5432)
203 .await
204 .expect("Failed to get port")
205 );
206
207 DB_CONTAINER.set(container).ok();
208
209 spawn_app(connection_string).await
210}
211
212#[cfg(feature = "external-infra")]
213async fn setup_with_testcontainers() -> String {
214 panic!("Testcontainers disabled with external-infra feature. Set DATABASE_URL and S3_ENDPOINT.");
215}
216
217async fn setup_mock_appview(mock_server: &MockServer) {
218 Mock::given(method("GET"))
219 .and(path("/xrpc/app.bsky.actor.getProfile"))
220 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
221 "handle": "mock.handle",
222 "did": "did:plc:mock",
223 "displayName": "Mock User"
224 })))
225 .mount(mock_server)
226 .await;
227
228 Mock::given(method("GET"))
229 .and(path("/xrpc/app.bsky.actor.searchActors"))
230 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
231 "actors": [],
232 "cursor": null
233 })))
234 .mount(mock_server)
235 .await;
236
237 Mock::given(method("GET"))
238 .and(path("/xrpc/app.bsky.feed.getTimeline"))
239 .respond_with(
240 ResponseTemplate::new(200)
241 .insert_header("atproto-repo-rev", "0")
242 .set_body_json(json!({
243 "feed": [],
244 "cursor": null
245 }))
246 )
247 .mount(mock_server)
248 .await;
249
250 Mock::given(method("GET"))
251 .and(path("/xrpc/app.bsky.feed.getAuthorFeed"))
252 .respond_with(
253 ResponseTemplate::new(200)
254 .insert_header("atproto-repo-rev", "0")
255 .set_body_json(json!({
256 "feed": [{
257 "post": {
258 "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author",
259 "cid": "bafyappview123",
260 "author": {"did": "did:plc:mock-author", "handle": "mock.author"},
261 "record": {
262 "$type": "app.bsky.feed.post",
263 "text": "Author feed post from appview",
264 "createdAt": "2025-01-01T00:00:00Z"
265 },
266 "indexedAt": "2025-01-01T00:00:00Z"
267 }
268 }],
269 "cursor": "author-cursor"
270 })),
271 )
272 .mount(mock_server)
273 .await;
274
275 Mock::given(method("GET"))
276 .and(path("/xrpc/app.bsky.feed.getActorLikes"))
277 .respond_with(
278 ResponseTemplate::new(200)
279 .insert_header("atproto-repo-rev", "0")
280 .set_body_json(json!({
281 "feed": [{
282 "post": {
283 "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post",
284 "cid": "bafyliked123",
285 "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"},
286 "record": {
287 "$type": "app.bsky.feed.post",
288 "text": "Liked post from appview",
289 "createdAt": "2025-01-01T00:00:00Z"
290 },
291 "indexedAt": "2025-01-01T00:00:00Z"
292 }
293 }],
294 "cursor": null
295 })),
296 )
297 .mount(mock_server)
298 .await;
299
300 Mock::given(method("GET"))
301 .and(path("/xrpc/app.bsky.feed.getPostThread"))
302 .respond_with(
303 ResponseTemplate::new(200)
304 .insert_header("atproto-repo-rev", "0")
305 .set_body_json(json!({
306 "thread": {
307 "$type": "app.bsky.feed.defs#threadViewPost",
308 "post": {
309 "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post",
310 "cid": "bafythread123",
311 "author": {"did": "did:plc:mock", "handle": "mock.handle"},
312 "record": {
313 "$type": "app.bsky.feed.post",
314 "text": "Thread post from appview",
315 "createdAt": "2025-01-01T00:00:00Z"
316 },
317 "indexedAt": "2025-01-01T00:00:00Z"
318 },
319 "replies": []
320 }
321 })),
322 )
323 .mount(mock_server)
324 .await;
325
326 Mock::given(method("GET"))
327 .and(path("/xrpc/app.bsky.feed.getFeed"))
328 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
329 "feed": [{
330 "post": {
331 "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post",
332 "cid": "bafyfeed123",
333 "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"},
334 "record": {
335 "$type": "app.bsky.feed.post",
336 "text": "Custom feed post from appview",
337 "createdAt": "2025-01-01T00:00:00Z"
338 },
339 "indexedAt": "2025-01-01T00:00:00Z"
340 }
341 }],
342 "cursor": null
343 })))
344 .mount(mock_server)
345 .await;
346
347 Mock::given(method("POST"))
348 .and(path("/xrpc/app.bsky.notification.registerPush"))
349 .respond_with(ResponseTemplate::new(200))
350 .mount(mock_server)
351 .await;
352}
353
354async fn spawn_app(database_url: String) -> String {
355 let pool = PgPoolOptions::new()
356 .max_connections(50)
357 .connect(&database_url)
358 .await
359 .expect("Failed to connect to Postgres. Make sure the database is running.");
360
361 sqlx::migrate!("./migrations")
362 .run(&pool)
363 .await
364 .expect("Failed to run migrations");
365
366 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
367 let addr = listener.local_addr().unwrap();
368 APP_PORT.set(addr.port()).ok();
369
370 unsafe {
371 std::env::set_var("PDS_HOSTNAME", addr.to_string());
372 }
373
374 let state = AppState::new(pool).await;
375
376 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
377
378 let app = bspds::app(state);
379
380 tokio::spawn(async move {
381 axum::serve(listener, app).await.unwrap();
382 });
383
384 format!("http://{}", addr)
385}
386
387#[allow(dead_code)]
388pub async fn get_db_connection_string() -> String {
389 base_url().await;
390
391 if has_external_infra() {
392 std::env::var("DATABASE_URL").expect("DATABASE_URL not set")
393 } else {
394 #[cfg(not(feature = "external-infra"))]
395 {
396 let container = DB_CONTAINER.get().expect("DB container not initialized");
397 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port");
398 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
399 }
400 #[cfg(feature = "external-infra")]
401 {
402 panic!("DATABASE_URL must be set with external-infra feature");
403 }
404 }
405}
406
407#[allow(dead_code)]
408pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
409 let res = client
410 .post(format!(
411 "{}/xrpc/com.atproto.repo.uploadBlob",
412 base_url().await
413 ))
414 .header(header::CONTENT_TYPE, mime)
415 .bearer_auth(AUTH_TOKEN)
416 .body(data)
417 .send()
418 .await
419 .expect("Failed to send uploadBlob request");
420
421 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
422 let body: Value = res.json().await.expect("Blob upload response was not JSON");
423 body["blob"].clone()
424}
425
426#[allow(dead_code)]
427pub async fn create_test_post(
428 client: &Client,
429 text: &str,
430 reply_to: Option<Value>,
431) -> (String, String, String) {
432 let collection = "app.bsky.feed.post";
433 let mut record = json!({
434 "$type": collection,
435 "text": text,
436 "createdAt": Utc::now().to_rfc3339()
437 });
438
439 if let Some(reply_obj) = reply_to {
440 record["reply"] = reply_obj;
441 }
442
443 let payload = json!({
444 "repo": AUTH_DID,
445 "collection": collection,
446 "record": record
447 });
448
449 let res = client
450 .post(format!(
451 "{}/xrpc/com.atproto.repo.createRecord",
452 base_url().await
453 ))
454 .bearer_auth(AUTH_TOKEN)
455 .json(&payload)
456 .send()
457 .await
458 .expect("Failed to send createRecord");
459
460 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
461 let body: Value = res
462 .json()
463 .await
464 .expect("createRecord response was not JSON");
465
466 let uri = body["uri"]
467 .as_str()
468 .expect("Response had no URI")
469 .to_string();
470 let cid = body["cid"]
471 .as_str()
472 .expect("Response had no CID")
473 .to_string();
474 let rkey = uri
475 .split('/')
476 .last()
477 .expect("URI was malformed")
478 .to_string();
479
480 (uri, cid, rkey)
481}
482
483#[allow(dead_code)]
484pub async fn create_account_and_login(client: &Client) -> (String, String) {
485 let mut last_error = String::new();
486
487 for attempt in 0..3 {
488 if attempt > 0 {
489 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
490 }
491
492 let handle = format!("user_{}", uuid::Uuid::new_v4());
493 let payload = json!({
494 "handle": handle,
495 "email": format!("{}@example.com", handle),
496 "password": "password"
497 });
498
499 let res = match client
500 .post(format!(
501 "{}/xrpc/com.atproto.server.createAccount",
502 base_url().await
503 ))
504 .json(&payload)
505 .send()
506 .await
507 {
508 Ok(r) => r,
509 Err(e) => {
510 last_error = format!("Request failed: {}", e);
511 continue;
512 }
513 };
514
515 if res.status() == StatusCode::OK {
516 let body: Value = res.json().await.expect("Invalid JSON");
517 let access_jwt = body["accessJwt"]
518 .as_str()
519 .expect("No accessJwt")
520 .to_string();
521 let did = body["did"].as_str().expect("No did").to_string();
522 return (access_jwt, did);
523 }
524
525 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
526 }
527
528 panic!("Failed to create account after 3 attempts: {}", last_error);
529}