this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use testcontainers::core::ContainerPort;
15use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
16use testcontainers_modules::postgres::Postgres;
17use tokio::net::TcpListener;
18use wiremock::matchers::{method, path};
19use wiremock::{Mock, MockServer, ResponseTemplate};
20
21static SERVER_URL: OnceLock<String> = OnceLock::new();
22static APP_PORT: OnceLock<u16> = OnceLock::new();
23static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
24static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
25static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
26
27#[allow(dead_code)]
28pub const AUTH_TOKEN: &str = "test-token";
29#[allow(dead_code)]
30pub const BAD_AUTH_TOKEN: &str = "bad-token";
31#[allow(dead_code)]
32pub const AUTH_DID: &str = "did:plc:fake";
33#[allow(dead_code)]
34pub const TARGET_DID: &str = "did:plc:target";
35
36#[cfg(test)]
37#[ctor::dtor]
38fn cleanup() {
39 // my attempt to force clean up containers created by this test binary.
40 // this is a fallback in case ryuk fails or is not supported
41 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
42 let _ = std::process::Command::new("podman")
43 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
44 .output();
45 }
46
47 let _ = std::process::Command::new("docker")
48 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"])
49 .output();
50}
51
52#[allow(dead_code)]
53pub fn client() -> Client {
54 Client::new()
55}
56
57#[allow(dead_code)]
58pub fn app_port() -> u16 {
59 *APP_PORT.get().expect("APP_PORT not initialized")
60}
61
62pub async fn base_url() -> &'static str {
63 SERVER_URL.get_or_init(|| {
64 let (tx, rx) = std::sync::mpsc::channel();
65
66 std::thread::spawn(move || {
67 if std::env::var("DOCKER_HOST").is_err() {
68 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
69 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
70 if podman_sock.exists() {
71 unsafe {
72 std::env::set_var(
73 "DOCKER_HOST",
74 format!("unix://{}", podman_sock.display()),
75 );
76 }
77 }
78 }
79 }
80
81 let rt = tokio::runtime::Runtime::new().unwrap();
82 rt.block_on(async move {
83 let s3_container = GenericImage::new("minio/minio", "latest")
84 .with_exposed_port(ContainerPort::Tcp(9000))
85 .with_env_var("MINIO_ROOT_USER", "minioadmin")
86 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
87 .with_cmd(vec!["server".to_string(), "/data".to_string()])
88 .with_label("bspds_test", "true")
89 .start()
90 .await
91 .expect("Failed to start MinIO");
92
93 let s3_port = s3_container
94 .get_host_port_ipv4(9000)
95 .await
96 .expect("Failed to get S3 port");
97 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
98
99 unsafe {
100 std::env::set_var("S3_BUCKET", "test-bucket");
101 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
102 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
103 std::env::set_var("AWS_REGION", "us-east-1");
104 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
105 }
106
107 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
108 .region("us-east-1")
109 .endpoint_url(&s3_endpoint)
110 .credentials_provider(Credentials::new(
111 "minioadmin",
112 "minioadmin",
113 None,
114 None,
115 "test",
116 ))
117 .load()
118 .await;
119
120 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
121 .force_path_style(true)
122 .build();
123 let s3_client = S3Client::from_conf(s3_config);
124
125 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
126
127 let mock_server = MockServer::start().await;
128
129 Mock::given(method("GET"))
130 .and(path("/xrpc/app.bsky.actor.getProfile"))
131 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
132 "handle": "mock.handle",
133 "did": "did:plc:mock",
134 "displayName": "Mock User"
135 })))
136 .mount(&mock_server)
137 .await;
138
139 Mock::given(method("GET"))
140 .and(path("/xrpc/app.bsky.actor.searchActors"))
141 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
142 "actors": [],
143 "cursor": null
144 })))
145 .mount(&mock_server)
146 .await;
147
148 unsafe {
149 std::env::set_var("APPVIEW_URL", mock_server.uri());
150 }
151 MOCK_APPVIEW.set(mock_server).ok();
152
153 S3_CONTAINER.set(s3_container).ok();
154
155 let container = Postgres::default()
156 .with_tag("18-alpine")
157 .with_label("bspds_test", "true")
158 .start()
159 .await
160 .expect("Failed to start Postgres");
161 let connection_string = format!(
162 "postgres://postgres:postgres@127.0.0.1:{}",
163 container
164 .get_host_port_ipv4(5432)
165 .await
166 .expect("Failed to get port")
167 );
168
169 DB_CONTAINER.set(container).ok();
170
171 let url = spawn_app(connection_string).await;
172 tx.send(url).unwrap();
173 std::future::pending::<()>().await;
174 });
175 });
176
177 rx.recv().expect("Failed to start test server")
178 })
179}
180
181async fn spawn_app(database_url: String) -> String {
182 let pool = PgPoolOptions::new()
183 .max_connections(50)
184 .connect(&database_url)
185 .await
186 .expect("Failed to connect to Postgres. Make sure the database is running.");
187
188 sqlx::migrate!("./migrations")
189 .run(&pool)
190 .await
191 .expect("Failed to run migrations");
192
193 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
194 let addr = listener.local_addr().unwrap();
195 APP_PORT.set(addr.port()).ok();
196
197 unsafe {
198 std::env::set_var("PDS_HOSTNAME", addr.to_string());
199 }
200
201 let state = AppState::new(pool).await;
202
203 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
204
205 let app = bspds::app(state);
206
207 tokio::spawn(async move {
208 axum::serve(listener, app).await.unwrap();
209 });
210
211 format!("http://{}", addr)
212}
213
214#[allow(dead_code)]
215pub async fn get_db_connection_string() -> String {
216 base_url().await;
217 let container = DB_CONTAINER.get().expect("DB container not initialized");
218 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port");
219 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
220}
221
222#[allow(dead_code)]
223pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
224 let res = client
225 .post(format!(
226 "{}/xrpc/com.atproto.repo.uploadBlob",
227 base_url().await
228 ))
229 .header(header::CONTENT_TYPE, mime)
230 .bearer_auth(AUTH_TOKEN)
231 .body(data)
232 .send()
233 .await
234 .expect("Failed to send uploadBlob request");
235
236 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
237 let body: Value = res.json().await.expect("Blob upload response was not JSON");
238 body["blob"].clone()
239}
240
241#[allow(dead_code)]
242pub async fn create_test_post(
243 client: &Client,
244 text: &str,
245 reply_to: Option<Value>,
246) -> (String, String, String) {
247 let collection = "app.bsky.feed.post";
248 let mut record = json!({
249 "$type": collection,
250 "text": text,
251 "createdAt": Utc::now().to_rfc3339()
252 });
253
254 if let Some(reply_obj) = reply_to {
255 record["reply"] = reply_obj;
256 }
257
258 let payload = json!({
259 "repo": AUTH_DID,
260 "collection": collection,
261 "record": record
262 });
263
264 let res = client
265 .post(format!(
266 "{}/xrpc/com.atproto.repo.createRecord",
267 base_url().await
268 ))
269 .bearer_auth(AUTH_TOKEN)
270 .json(&payload)
271 .send()
272 .await
273 .expect("Failed to send createRecord");
274
275 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
276 let body: Value = res
277 .json()
278 .await
279 .expect("createRecord response was not JSON");
280
281 let uri = body["uri"]
282 .as_str()
283 .expect("Response had no URI")
284 .to_string();
285 let cid = body["cid"]
286 .as_str()
287 .expect("Response had no CID")
288 .to_string();
289 let rkey = uri
290 .split('/')
291 .last()
292 .expect("URI was malformed")
293 .to_string();
294
295 (uri, cid, rkey)
296}
297
298#[allow(dead_code)]
299pub async fn create_account_and_login(client: &Client) -> (String, String) {
300 let mut last_error = String::new();
301
302 for attempt in 0..3 {
303 if attempt > 0 {
304 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
305 }
306
307 let handle = format!("user_{}", uuid::Uuid::new_v4());
308 let payload = json!({
309 "handle": handle,
310 "email": format!("{}@example.com", handle),
311 "password": "password"
312 });
313
314 let res = match client
315 .post(format!(
316 "{}/xrpc/com.atproto.server.createAccount",
317 base_url().await
318 ))
319 .json(&payload)
320 .send()
321 .await
322 {
323 Ok(r) => r,
324 Err(e) => {
325 last_error = format!("Request failed: {}", e);
326 continue;
327 }
328 };
329
330 if res.status() == StatusCode::OK {
331 let body: Value = res.json().await.expect("Invalid JSON");
332 let access_jwt = body["accessJwt"]
333 .as_str()
334 .expect("No accessJwt")
335 .to_string();
336 let did = body["did"].as_str().expect("No did").to_string();
337 return (access_jwt, did);
338 }
339
340 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
341 }
342
343 panic!("Failed to create account after 3 attempts: {}", last_error);
344}