this repo has no description
1use aws_config::BehaviorVersion;
2use aws_sdk_s3::Client as S3Client;
3use aws_sdk_s3::config::Credentials;
4use bspds::state::AppState;
5use chrono::Utc;
6use reqwest::{Client, StatusCode, header};
7use serde_json::{Value, json};
8use sqlx::postgres::PgPoolOptions;
9#[allow(unused_imports)]
10use std::collections::HashMap;
11use std::sync::OnceLock;
12#[allow(unused_imports)]
13use std::time::Duration;
14use testcontainers::core::ContainerPort;
15use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
16use testcontainers_modules::postgres::Postgres;
17use tokio::net::TcpListener;
18use wiremock::matchers::{method, path};
19use wiremock::{Mock, MockServer, ResponseTemplate};
20
21static SERVER_URL: OnceLock<String> = OnceLock::new();
22static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new();
23static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new();
24static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new();
25
26#[allow(dead_code)]
27pub const AUTH_TOKEN: &str = "test-token";
28#[allow(dead_code)]
29pub const BAD_AUTH_TOKEN: &str = "bad-token";
30#[allow(dead_code)]
31pub const AUTH_DID: &str = "did:plc:fake";
32#[allow(dead_code)]
33pub const TARGET_DID: &str = "did:plc:target";
34
35#[cfg(test)]
36#[ctor::dtor]
37fn cleanup() {
38 // my attempt to force clean up containers created by this test binary.
39 // this is a fallback in case ryuk fails or is not supported
40 if std::env::var("XDG_RUNTIME_DIR").is_ok() {
41 let _ = std::process::Command::new("podman")
42 .args(&["rm", "-f", "--filter", "label=bspds_test=true"])
43 .output();
44 }
45
46 let _ = std::process::Command::new("docker")
47 .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"])
48 .output();
49}
50
51#[allow(dead_code)]
52pub fn client() -> Client {
53 Client::new()
54}
55
56pub async fn base_url() -> &'static str {
57 SERVER_URL.get_or_init(|| {
58 let (tx, rx) = std::sync::mpsc::channel();
59
60 std::thread::spawn(move || {
61 if std::env::var("DOCKER_HOST").is_err() {
62 if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
63 let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
64 if podman_sock.exists() {
65 unsafe {
66 std::env::set_var(
67 "DOCKER_HOST",
68 format!("unix://{}", podman_sock.display()),
69 );
70 }
71 }
72 }
73 }
74
75 let rt = tokio::runtime::Runtime::new().unwrap();
76 rt.block_on(async move {
77 let s3_container = GenericImage::new("minio/minio", "latest")
78 .with_exposed_port(ContainerPort::Tcp(9000))
79 .with_env_var("MINIO_ROOT_USER", "minioadmin")
80 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin")
81 .with_cmd(vec!["server".to_string(), "/data".to_string()])
82 .with_label("bspds_test", "true")
83 .start()
84 .await
85 .expect("Failed to start MinIO");
86
87 let s3_port = s3_container
88 .get_host_port_ipv4(9000)
89 .await
90 .expect("Failed to get S3 port");
91 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port);
92
93 unsafe {
94 std::env::set_var("S3_BUCKET", "test-bucket");
95 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
96 std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
97 std::env::set_var("AWS_REGION", "us-east-1");
98 std::env::set_var("S3_ENDPOINT", &s3_endpoint);
99 }
100
101 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
102 .region("us-east-1")
103 .endpoint_url(&s3_endpoint)
104 .credentials_provider(Credentials::new(
105 "minioadmin",
106 "minioadmin",
107 None,
108 None,
109 "test",
110 ))
111 .load()
112 .await;
113
114 let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
115 .force_path_style(true)
116 .build();
117 let s3_client = S3Client::from_conf(s3_config);
118
119 let _ = s3_client.create_bucket().bucket("test-bucket").send().await;
120
121 let mock_server = MockServer::start().await;
122
123 Mock::given(method("GET"))
124 .and(path("/xrpc/app.bsky.actor.getProfile"))
125 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
126 "handle": "mock.handle",
127 "did": "did:plc:mock",
128 "displayName": "Mock User"
129 })))
130 .mount(&mock_server)
131 .await;
132
133 Mock::given(method("GET"))
134 .and(path("/xrpc/app.bsky.actor.searchActors"))
135 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
136 "actors": [],
137 "cursor": null
138 })))
139 .mount(&mock_server)
140 .await;
141
142 unsafe {
143 std::env::set_var("APPVIEW_URL", mock_server.uri());
144 }
145 MOCK_APPVIEW.set(mock_server).ok();
146
147 S3_CONTAINER.set(s3_container).ok();
148
149 let container = Postgres::default()
150 .with_tag("18-alpine")
151 .with_label("bspds_test", "true")
152 .start()
153 .await
154 .expect("Failed to start Postgres");
155 let connection_string = format!(
156 "postgres://postgres:postgres@127.0.0.1:{}/postgres",
157 container
158 .get_host_port_ipv4(5432)
159 .await
160 .expect("Failed to get port")
161 );
162
163 DB_CONTAINER.set(container).ok();
164
165 let url = spawn_app(connection_string).await;
166 tx.send(url).unwrap();
167 std::future::pending::<()>().await;
168 });
169 });
170
171 rx.recv().expect("Failed to start test server")
172 })
173}
174
175async fn spawn_app(database_url: String) -> String {
176 let pool = PgPoolOptions::new()
177 .max_connections(50)
178 .connect(&database_url)
179 .await
180 .expect("Failed to connect to Postgres. Make sure the database is running.");
181
182 sqlx::migrate!("./migrations")
183 .run(&pool)
184 .await
185 .expect("Failed to run migrations");
186
187 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
188 let addr = listener.local_addr().unwrap();
189
190 unsafe {
191 std::env::set_var("PDS_HOSTNAME", addr.to_string());
192 }
193
194 let state = AppState::new(pool).await;
195 let app = bspds::app(state);
196
197 tokio::spawn(async move {
198 axum::serve(listener, app).await.unwrap();
199 });
200
201 format!("http://{}", addr)
202}
203
204#[allow(dead_code)]
205pub async fn get_db_connection_string() -> String {
206 base_url().await;
207 let container = DB_CONTAINER.get().expect("DB container not initialized");
208 let port = container.get_host_port_ipv4(5432).await.expect("Failed to get port");
209 format!("postgres://postgres:postgres@127.0.0.1:{}/postgres", port)
210}
211
212#[allow(dead_code)]
213pub async fn upload_test_blob(client: &Client, data: &'static str, mime: &'static str) -> Value {
214 let res = client
215 .post(format!(
216 "{}/xrpc/com.atproto.repo.uploadBlob",
217 base_url().await
218 ))
219 .header(header::CONTENT_TYPE, mime)
220 .bearer_auth(AUTH_TOKEN)
221 .body(data)
222 .send()
223 .await
224 .expect("Failed to send uploadBlob request");
225
226 assert_eq!(res.status(), StatusCode::OK, "Failed to upload blob");
227 let body: Value = res.json().await.expect("Blob upload response was not JSON");
228 body["blob"].clone()
229}
230
231#[allow(dead_code)]
232pub async fn create_test_post(
233 client: &Client,
234 text: &str,
235 reply_to: Option<Value>,
236) -> (String, String, String) {
237 let collection = "app.bsky.feed.post";
238 let mut record = json!({
239 "$type": collection,
240 "text": text,
241 "createdAt": Utc::now().to_rfc3339()
242 });
243
244 if let Some(reply_obj) = reply_to {
245 record["reply"] = reply_obj;
246 }
247
248 let payload = json!({
249 "repo": AUTH_DID,
250 "collection": collection,
251 "record": record
252 });
253
254 let res = client
255 .post(format!(
256 "{}/xrpc/com.atproto.repo.createRecord",
257 base_url().await
258 ))
259 .bearer_auth(AUTH_TOKEN)
260 .json(&payload)
261 .send()
262 .await
263 .expect("Failed to send createRecord");
264
265 assert_eq!(res.status(), StatusCode::OK, "Failed to create post record");
266 let body: Value = res
267 .json()
268 .await
269 .expect("createRecord response was not JSON");
270
271 let uri = body["uri"]
272 .as_str()
273 .expect("Response had no URI")
274 .to_string();
275 let cid = body["cid"]
276 .as_str()
277 .expect("Response had no CID")
278 .to_string();
279 let rkey = uri
280 .split('/')
281 .last()
282 .expect("URI was malformed")
283 .to_string();
284
285 (uri, cid, rkey)
286}
287
288#[allow(dead_code)]
289pub async fn create_account_and_login(client: &Client) -> (String, String) {
290 let mut last_error = String::new();
291
292 for attempt in 0..3 {
293 if attempt > 0 {
294 tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
295 }
296
297 let handle = format!("user_{}", uuid::Uuid::new_v4());
298 let payload = json!({
299 "handle": handle,
300 "email": format!("{}@example.com", handle),
301 "password": "password"
302 });
303
304 let res = match client
305 .post(format!(
306 "{}/xrpc/com.atproto.server.createAccount",
307 base_url().await
308 ))
309 .json(&payload)
310 .send()
311 .await
312 {
313 Ok(r) => r,
314 Err(e) => {
315 last_error = format!("Request failed: {}", e);
316 continue;
317 }
318 };
319
320 if res.status() == StatusCode::OK {
321 let body: Value = res.json().await.expect("Invalid JSON");
322 let access_jwt = body["accessJwt"]
323 .as_str()
324 .expect("No accessJwt")
325 .to_string();
326 let did = body["did"].as_str().expect("No did").to_string();
327 return (access_jwt, did);
328 }
329
330 last_error = format!("Status {}: {:?}", res.status(), res.text().await);
331 }
332
333 panic!("Failed to create account after 3 attempts: {}", last_error);
334}