···15JWT_SECRET=your-super-secret-jwt-key-please-change-me
16PDS_HOSTNAME=localhost:3000 # The public-facing hostname of the PDS
17PLC_URL=plc.directory
0
···15JWT_SECRET=your-super-secret-jwt-key-please-change-me
16PDS_HOSTNAME=localhost:3000 # The public-facing hostname of the PDS
17PLC_URL=plc.directory
18+APPVIEW_URL=https://api.bsky.app
···1+# Run all tests with correct threading models
2+test: test-proxy test-lifecycle test-others
3+4+# Proxy tests modify environment variables, so must run single-threaded
5+# TODO: figure out how to run in parallel
6+test-proxy:
7+ cargo test --test proxy -- --test-threads=1
8+9+# Lifecycle tests involve complex state mutations, run single-threaded to be safe
10+# TODO: figure out how to run in parallel
11+test-lifecycle:
12+ cargo test --test lifecycle -- --test-threads=1
13+14+test-others:
15+ cargo test --lib
16+ cargo test --test actor
17+ cargo test --test feed
18+ cargo test --test graph
19+ cargo test --test identity
20+ cargo test --test notification
21+ cargo test --test repo
22+ cargo test --test server
23+ cargo test --test sync
···1+mod common;
2+3+use axum::{
4+ routing::any,
5+ Router,
6+ extract::Request,
7+ http::StatusCode,
8+};
9+use tokio::net::TcpListener;
10+use reqwest::Client;
11+use std::sync::Arc;
12+13+async fn spawn_mock_upstream() -> (String, tokio::sync::mpsc::Receiver<(String, String, Option<String>)>) {
14+ let (tx, rx) = tokio::sync::mpsc::channel(10);
15+ let tx = Arc::new(tx);
16+17+ let app = Router::new().fallback(any(move |req: Request| {
18+ let tx = tx.clone();
19+ async move {
20+ let method = req.method().to_string();
21+ let uri = req.uri().to_string();
22+ let auth = req.headers().get("Authorization")
23+ .and_then(|h| h.to_str().ok())
24+ .map(|s| s.to_string());
25+26+ let _ = tx.send((method, uri, auth)).await;
27+ (StatusCode::OK, "Mock Response")
28+ }
29+ }));
30+31+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
32+ let addr = listener.local_addr().unwrap();
33+34+ tokio::spawn(async move {
35+ axum::serve(listener, app).await.unwrap();
36+ });
37+38+ (format!("http://{}", addr), rx)
39+}
40+41+#[tokio::test]
42+async fn test_proxy_via_header() {
43+ let app_url = common::base_url().await;
44+ let (upstream_url, mut rx) = spawn_mock_upstream().await;
45+ let client = Client::new();
46+47+ let res = client.get(format!("{}/xrpc/com.example.test", app_url))
48+ .header("atproto-proxy", &upstream_url)
49+ .header("Authorization", "Bearer test-token")
50+ .send()
51+ .await
52+ .unwrap();
53+54+ assert_eq!(res.status(), StatusCode::OK);
55+56+ let (method, uri, auth) = rx.recv().await.expect("Upstream should receive request");
57+ assert_eq!(method, "GET");
58+ assert_eq!(uri, "/xrpc/com.example.test");
59+ assert_eq!(auth, Some("Bearer test-token".to_string()));
60+}
61+62+#[tokio::test]
63+async fn test_proxy_via_env_var() {
64+ let (upstream_url, mut rx) = spawn_mock_upstream().await;
65+66+ unsafe { std::env::set_var("APPVIEW_URL", &upstream_url); }
67+68+ let app_url = common::base_url().await;
69+ let client = Client::new();
70+71+ let res = client.get(format!("{}/xrpc/com.example.envtest", app_url))
72+ .send()
73+ .await
74+ .unwrap();
75+76+ assert_eq!(res.status(), StatusCode::OK);
77+78+ let (method, uri, _) = rx.recv().await.expect("Upstream should receive request");
79+ assert_eq!(method, "GET");
80+ assert_eq!(uri, "/xrpc/com.example.envtest");
81+}
82+83+#[tokio::test]
84+async fn test_proxy_missing_config() {
85+ unsafe { std::env::remove_var("APPVIEW_URL"); }
86+87+ let app_url = common::base_url().await;
88+ let client = Client::new();
89+90+ let res = client.get(format!("{}/xrpc/com.example.fail", app_url))
91+ .send()
92+ .await
93+ .unwrap();
94+95+ assert_eq!(res.status(), StatusCode::BAD_GATEWAY);
96+}
+16-16
tests/repo.rs
···15 ("rkey", "self"),
16 ];
1718- let res = client.get(format!("{}/xrpc/com.atproto.repo.getRecord", BASE_URL))
19 .query(¶ms)
20 .send()
21 .await
···36 ("rkey", "nonexistent"),
37 ];
3839- let res = client.get(format!("{}/xrpc/com.atproto.repo.getRecord", BASE_URL))
40 .query(¶ms)
41 .send()
42 .await
···51#[ignore]
52async fn test_upload_blob_no_auth() {
53 let client = client();
54- let res = client.post(format!("{}/xrpc/com.atproto.repo.uploadBlob", BASE_URL))
55 .header(header::CONTENT_TYPE, "text/plain")
56 .body("no auth")
57 .send()
···68async fn test_upload_blob_success() {
69 let client = client();
70 let (token, _) = create_account_and_login(&client).await;
71- let res = client.post(format!("{}/xrpc/com.atproto.repo.uploadBlob", BASE_URL))
72 .header(header::CONTENT_TYPE, "text/plain")
73 .bearer_auth(token)
74 .body("This is our blob data")
···92 "record": {}
93 });
9495- let res = client.post(format!("{}/xrpc/com.atproto.repo.putRecord", BASE_URL))
96 .json(&payload)
97 .send()
98 .await
···120 }
121 });
122123- let res = client.post(format!("{}/xrpc/com.atproto.repo.putRecord", BASE_URL))
124 .bearer_auth(token)
125 .json(&payload)
126 .send()
···142 ("repo", "did:plc:12345"),
143 ];
144145- let res = client.get(format!("{}/xrpc/com.atproto.repo.getRecord", BASE_URL))
146 .query(¶ms)
147 .send()
148 .await
···156#[ignore]
157async fn test_upload_blob_bad_token() {
158 let client = client();
159- let res = client.post(format!("{}/xrpc/com.atproto.repo.uploadBlob", BASE_URL))
160 .header(header::CONTENT_TYPE, "text/plain")
161 .bearer_auth(BAD_AUTH_TOKEN)
162 .body("This is our blob data")
···187 }
188 });
189190- let res = client.post(format!("{}/xrpc/com.atproto.repo.putRecord", BASE_URL))
191 .bearer_auth(token)
192 .json(&payload)
193 .send()
···215 }
216 });
217218- let res = client.post(format!("{}/xrpc/com.atproto.repo.putRecord", BASE_URL))
219 .bearer_auth(token)
220 .json(&payload)
221 .send()
···231async fn test_upload_blob_unsupported_mime_type() {
232 let client = client();
233 let (token, _) = create_account_and_login(&client).await;
234- let res = client.post(format!("{}/xrpc/com.atproto.repo.uploadBlob", BASE_URL))
235 .header(header::CONTENT_TYPE, "application/xml")
236 .bearer_auth(token)
237 .body("<xml>not an image</xml>")
···252 ("collection", "app.bsky.feed.post"),
253 ("limit", "10"),
254 ];
255- let res = client.get(format!("{}/xrpc/com.atproto.repo.listRecords", BASE_URL))
256 .query(¶ms)
257 .send()
258 .await
···270 "collection": "app.bsky.feed.post",
271 "rkey": "some_post_to_delete"
272 });
273- let res = client.post(format!("{}/xrpc/com.atproto.repo.deleteRecord", BASE_URL))
274 .bearer_auth(token)
275 .json(&payload)
276 .send()
···287 let params = [
288 ("repo", did.as_str()),
289 ];
290- let res = client.get(format!("{}/xrpc/com.atproto.repo.describeRepo", BASE_URL))
291 .query(¶ms)
292 .send()
293 .await
···310 }
311 });
312313- let res = client.post(format!("{}/xrpc/com.atproto.repo.createRecord", BASE_URL))
314 .json(&payload)
315 .bearer_auth(token) // Assuming auth is required
316 .send()
···340 }
341 });
342343- let res = client.post(format!("{}/xrpc/com.atproto.repo.createRecord", BASE_URL))
344 .json(&payload)
345 .bearer_auth(token) // Assuming auth is required
346 .send()
···15 ("rkey", "self"),
16 ];
1718+ let res = client.get(format!("{}/xrpc/com.atproto.repo.getRecord", base_url().await))
19 .query(¶ms)
20 .send()
21 .await
···36 ("rkey", "nonexistent"),
37 ];
3839+ let res = client.get(format!("{}/xrpc/com.atproto.repo.getRecord", base_url().await))
40 .query(¶ms)
41 .send()
42 .await
···51#[ignore]
52async fn test_upload_blob_no_auth() {
53 let client = client();
54+ let res = client.post(format!("{}/xrpc/com.atproto.repo.uploadBlob", base_url().await))
55 .header(header::CONTENT_TYPE, "text/plain")
56 .body("no auth")
57 .send()
···68async fn test_upload_blob_success() {
69 let client = client();
70 let (token, _) = create_account_and_login(&client).await;
71+ let res = client.post(format!("{}/xrpc/com.atproto.repo.uploadBlob", base_url().await))
72 .header(header::CONTENT_TYPE, "text/plain")
73 .bearer_auth(token)
74 .body("This is our blob data")
···92 "record": {}
93 });
9495+ let res = client.post(format!("{}/xrpc/com.atproto.repo.putRecord", base_url().await))
96 .json(&payload)
97 .send()
98 .await
···120 }
121 });
122123+ let res = client.post(format!("{}/xrpc/com.atproto.repo.putRecord", base_url().await))
124 .bearer_auth(token)
125 .json(&payload)
126 .send()
···142 ("repo", "did:plc:12345"),
143 ];
144145+ let res = client.get(format!("{}/xrpc/com.atproto.repo.getRecord", base_url().await))
146 .query(¶ms)
147 .send()
148 .await
···156#[ignore]
157async fn test_upload_blob_bad_token() {
158 let client = client();
159+ let res = client.post(format!("{}/xrpc/com.atproto.repo.uploadBlob", base_url().await))
160 .header(header::CONTENT_TYPE, "text/plain")
161 .bearer_auth(BAD_AUTH_TOKEN)
162 .body("This is our blob data")
···187 }
188 });
189190+ let res = client.post(format!("{}/xrpc/com.atproto.repo.putRecord", base_url().await))
191 .bearer_auth(token)
192 .json(&payload)
193 .send()
···215 }
216 });
217218+ let res = client.post(format!("{}/xrpc/com.atproto.repo.putRecord", base_url().await))
219 .bearer_auth(token)
220 .json(&payload)
221 .send()
···231async fn test_upload_blob_unsupported_mime_type() {
232 let client = client();
233 let (token, _) = create_account_and_login(&client).await;
234+ let res = client.post(format!("{}/xrpc/com.atproto.repo.uploadBlob", base_url().await))
235 .header(header::CONTENT_TYPE, "application/xml")
236 .bearer_auth(token)
237 .body("<xml>not an image</xml>")
···252 ("collection", "app.bsky.feed.post"),
253 ("limit", "10"),
254 ];
255+ let res = client.get(format!("{}/xrpc/com.atproto.repo.listRecords", base_url().await))
256 .query(¶ms)
257 .send()
258 .await
···270 "collection": "app.bsky.feed.post",
271 "rkey": "some_post_to_delete"
272 });
273+ let res = client.post(format!("{}/xrpc/com.atproto.repo.deleteRecord", base_url().await))
274 .bearer_auth(token)
275 .json(&payload)
276 .send()
···287 let params = [
288 ("repo", did.as_str()),
289 ];
290+ let res = client.get(format!("{}/xrpc/com.atproto.repo.describeRepo", base_url().await))
291 .query(¶ms)
292 .send()
293 .await
···310 }
311 });
312313+ let res = client.post(format!("{}/xrpc/com.atproto.repo.createRecord", base_url().await))
314 .json(&payload)
315 .bearer_auth(token) // Assuming auth is required
316 .send()
···340 }
341 });
342343+ let res = client.post(format!("{}/xrpc/com.atproto.repo.createRecord", base_url().await))
344 .json(&payload)
345 .bearer_auth(token) // Assuming auth is required
346 .send()
+11-11
tests/server.rs
···7#[tokio::test]
8async fn test_health() {
9 let client = client();
10- let res = client.get(format!("{}/health", BASE_URL))
11 .send()
12 .await
13 .expect("Failed to send request");
···19#[tokio::test]
20async fn test_describe_server() {
21 let client = client();
22- let res = client.get(format!("{}/xrpc/com.atproto.server.describeServer", BASE_URL))
23 .send()
24 .await
25 .expect("Failed to send request");
···39 "email": format!("{}@example.com", handle),
40 "password": "password"
41 });
42- let _ = client.post(format!("{}/xrpc/com.atproto.server.createAccount", BASE_URL))
43 .json(&payload)
44 .send()
45 .await;
···49 "password": "password"
50 });
5152- let res = client.post(format!("{}/xrpc/com.atproto.server.createSession", BASE_URL))
53 .json(&payload)
54 .send()
55 .await
···67 "password": "password"
68 });
6970- let res = client.post(format!("{}/xrpc/com.atproto.server.createSession", BASE_URL))
71 .json(&payload)
72 .send()
73 .await
···86 "password": "password"
87 });
8889- let res = client.post(format!("{}/xrpc/com.atproto.server.createAccount", BASE_URL))
90 .json(&payload)
91 .send()
92 .await
···98#[tokio::test]
99async fn test_get_session() {
100 let client = client();
101- let res = client.get(format!("{}/xrpc/com.atproto.server.getSession", BASE_URL))
102 .bearer_auth(AUTH_TOKEN)
103 .send()
104 .await
···117 "email": format!("{}@example.com", handle),
118 "password": "password"
119 });
120- let _ = client.post(format!("{}/xrpc/com.atproto.server.createAccount", BASE_URL))
121 .json(&payload)
122 .send()
123 .await;
···126 "identifier": handle,
127 "password": "password"
128 });
129- let res = client.post(format!("{}/xrpc/com.atproto.server.createSession", BASE_URL))
130 .json(&login_payload)
131 .send()
132 .await
···137 let refresh_jwt = body["refreshJwt"].as_str().expect("No refreshJwt").to_string();
138 let access_jwt = body["accessJwt"].as_str().expect("No accessJwt").to_string();
139140- let res = client.post(format!("{}/xrpc/com.atproto.server.refreshSession", BASE_URL))
141 .bearer_auth(&refresh_jwt)
142 .send()
143 .await
···154#[tokio::test]
155async fn test_delete_session() {
156 let client = client();
157- let res = client.post(format!("{}/xrpc/com.atproto.server.deleteSession", BASE_URL))
158 .bearer_auth(AUTH_TOKEN)
159 .send()
160 .await
···7#[tokio::test]
8async fn test_health() {
9 let client = client();
10+ let res = client.get(format!("{}/health", base_url().await))
11 .send()
12 .await
13 .expect("Failed to send request");
···19#[tokio::test]
20async fn test_describe_server() {
21 let client = client();
22+ let res = client.get(format!("{}/xrpc/com.atproto.server.describeServer", base_url().await))
23 .send()
24 .await
25 .expect("Failed to send request");
···39 "email": format!("{}@example.com", handle),
40 "password": "password"
41 });
42+ let _ = client.post(format!("{}/xrpc/com.atproto.server.createAccount", base_url().await))
43 .json(&payload)
44 .send()
45 .await;
···49 "password": "password"
50 });
5152+ let res = client.post(format!("{}/xrpc/com.atproto.server.createSession", base_url().await))
53 .json(&payload)
54 .send()
55 .await
···67 "password": "password"
68 });
6970+ let res = client.post(format!("{}/xrpc/com.atproto.server.createSession", base_url().await))
71 .json(&payload)
72 .send()
73 .await
···86 "password": "password"
87 });
8889+ let res = client.post(format!("{}/xrpc/com.atproto.server.createAccount", base_url().await))
90 .json(&payload)
91 .send()
92 .await
···98#[tokio::test]
99async fn test_get_session() {
100 let client = client();
101+ let res = client.get(format!("{}/xrpc/com.atproto.server.getSession", base_url().await))
102 .bearer_auth(AUTH_TOKEN)
103 .send()
104 .await
···117 "email": format!("{}@example.com", handle),
118 "password": "password"
119 });
120+ let _ = client.post(format!("{}/xrpc/com.atproto.server.createAccount", base_url().await))
121 .json(&payload)
122 .send()
123 .await;
···126 "identifier": handle,
127 "password": "password"
128 });
129+ let res = client.post(format!("{}/xrpc/com.atproto.server.createSession", base_url().await))
130 .json(&login_payload)
131 .send()
132 .await
···137 let refresh_jwt = body["refreshJwt"].as_str().expect("No refreshJwt").to_string();
138 let access_jwt = body["accessJwt"].as_str().expect("No accessJwt").to_string();
139140+ let res = client.post(format!("{}/xrpc/com.atproto.server.refreshSession", base_url().await))
141 .bearer_auth(&refresh_jwt)
142 .send()
143 .await
···154#[tokio::test]
155async fn test_delete_session() {
156 let client = client();
157+ let res = client.post(format!("{}/xrpc/com.atproto.server.deleteSession", base_url().await))
158 .bearer_auth(AUTH_TOKEN)
159 .send()
160 .await
+2-2
tests/sync.rs
···8 let params = [
9 ("did", AUTH_DID),
10 ];
11- let res = client.get(format!("{}/xrpc/com.atproto.sync.getRepo", BASE_URL))
12 .query(¶ms)
13 .send()
14 .await
···24 ("did", AUTH_DID),
25 // "cids" would be a list of CIDs
26 ];
27- let res = client.get(format!("{}/xrpc/com.atproto.sync.getBlocks", BASE_URL))
28 .query(¶ms)
29 .send()
30 .await
···8 let params = [
9 ("did", AUTH_DID),
10 ];
11+ let res = client.get(format!("{}/xrpc/com.atproto.sync.getRepo", base_url().await))
12 .query(¶ms)
13 .send()
14 .await
···24 ("did", AUTH_DID),
25 // "cids" would be a list of CIDs
26 ];
27+ let res = client.get(format!("{}/xrpc/com.atproto.sync.getBlocks", base_url().await))
28 .query(¶ms)
29 .send()
30 .await