this repo has no description
1mod common;
2use common::*;
3use k256::ecdsa::SigningKey;
4use reqwest::StatusCode;
5use serde_json::{json, Value};
6use sqlx::PgPool;
7use wiremock::matchers::{method, path};
8use wiremock::{Mock, MockServer, ResponseTemplate};
9fn encode_uvarint(mut x: u64) -> Vec<u8> {
10 let mut out = Vec::new();
11 while x >= 0x80 {
12 out.push(((x as u8) & 0x7F) | 0x80);
13 x >>= 7;
14 }
15 out.push(x as u8);
16 out
17}
18fn signing_key_to_did_key(signing_key: &SigningKey) -> String {
19 let verifying_key = signing_key.verifying_key();
20 let point = verifying_key.to_encoded_point(true);
21 let compressed_bytes = point.as_bytes();
22 let mut prefixed = vec![0xe7, 0x01];
23 prefixed.extend_from_slice(compressed_bytes);
24 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed);
25 format!("did:key:{}", encoded)
26}
27fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String {
28 let public_key = signing_key.verifying_key();
29 let compressed = public_key.to_sec1_bytes();
30 let mut buf = encode_uvarint(0xE7);
31 buf.extend_from_slice(&compressed);
32 multibase::encode(multibase::Base::Base58Btc, buf)
33}
34async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> {
35 let db_url = get_db_connection_string().await;
36 let pool = PgPool::connect(&db_url).await.ok()?;
37 let row = sqlx::query!(
38 r#"
39 SELECT k.key_bytes, k.encryption_version
40 FROM user_keys k
41 JOIN users u ON k.user_id = u.id
42 WHERE u.did = $1
43 "#,
44 did
45 )
46 .fetch_optional(&pool)
47 .await
48 .ok()??;
49 bspds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok()
50}
51async fn get_plc_token_from_db(did: &str) -> Option<String> {
52 let db_url = get_db_connection_string().await;
53 let pool = PgPool::connect(&db_url).await.ok()?;
54 sqlx::query_scalar!(
55 r#"
56 SELECT t.token
57 FROM plc_operation_tokens t
58 JOIN users u ON t.user_id = u.id
59 WHERE u.did = $1
60 "#,
61 did
62 )
63 .fetch_optional(&pool)
64 .await
65 .ok()?
66}
67async fn get_user_handle(did: &str) -> Option<String> {
68 let db_url = get_db_connection_string().await;
69 let pool = PgPool::connect(&db_url).await.ok()?;
70 sqlx::query_scalar!(
71 r#"SELECT handle FROM users WHERE did = $1"#,
72 did
73 )
74 .fetch_optional(&pool)
75 .await
76 .ok()?
77}
78fn create_mock_last_op(
79 _did: &str,
80 handle: &str,
81 signing_key: &SigningKey,
82 pds_endpoint: &str,
83) -> Value {
84 let did_key = signing_key_to_did_key(signing_key);
85 json!({
86 "type": "plc_operation",
87 "rotationKeys": [did_key.clone()],
88 "verificationMethods": {
89 "atproto": did_key
90 },
91 "alsoKnownAs": [format!("at://{}", handle)],
92 "services": {
93 "atproto_pds": {
94 "type": "AtprotoPersonalDataServer",
95 "endpoint": pds_endpoint
96 }
97 },
98 "prev": null,
99 "sig": "mock_signature_for_testing"
100 })
101}
102fn create_did_document(did: &str, handle: &str, signing_key: &SigningKey, pds_endpoint: &str) -> Value {
103 let multikey = get_multikey_from_signing_key(signing_key);
104 json!({
105 "@context": [
106 "https://www.w3.org/ns/did/v1",
107 "https://w3id.org/security/multikey/v1"
108 ],
109 "id": did,
110 "alsoKnownAs": [format!("at://{}", handle)],
111 "verificationMethod": [{
112 "id": format!("{}#atproto", did),
113 "type": "Multikey",
114 "controller": did,
115 "publicKeyMultibase": multikey
116 }],
117 "service": [{
118 "id": "#atproto_pds",
119 "type": "AtprotoPersonalDataServer",
120 "serviceEndpoint": pds_endpoint
121 }]
122 })
123}
124async fn setup_mock_plc_for_sign(
125 did: &str,
126 handle: &str,
127 signing_key: &SigningKey,
128 pds_endpoint: &str,
129) -> MockServer {
130 let mock_server = MockServer::start().await;
131 let did_encoded = urlencoding::encode(did);
132 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint);
133 Mock::given(method("GET"))
134 .and(path(format!("/{}/log/last", did_encoded)))
135 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
136 .mount(&mock_server)
137 .await;
138 mock_server
139}
140async fn setup_mock_plc_for_submit(
141 did: &str,
142 handle: &str,
143 signing_key: &SigningKey,
144 pds_endpoint: &str,
145) -> MockServer {
146 let mock_server = MockServer::start().await;
147 let did_encoded = urlencoding::encode(did);
148 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint);
149 Mock::given(method("GET"))
150 .and(path(format!("/{}", did_encoded)))
151 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone()))
152 .mount(&mock_server)
153 .await;
154 Mock::given(method("POST"))
155 .and(path(format!("/{}", did_encoded)))
156 .respond_with(ResponseTemplate::new(200))
157 .mount(&mock_server)
158 .await;
159 mock_server
160}
161#[tokio::test]
162#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"]
163async fn test_full_plc_operation_flow() {
164 let client = client();
165 let (token, did) = create_account_and_login(&client).await;
166 let key_bytes = get_user_signing_key(&did).await
167 .expect("Failed to get user signing key");
168 let signing_key = SigningKey::from_slice(&key_bytes)
169 .expect("Failed to create signing key");
170 let handle = get_user_handle(&did).await
171 .expect("Failed to get user handle");
172 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
173 let pds_endpoint = format!("https://{}", hostname);
174 let request_res = client
175 .post(format!(
176 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
177 base_url().await
178 ))
179 .bearer_auth(&token)
180 .send()
181 .await
182 .expect("Request failed");
183 assert_eq!(request_res.status(), StatusCode::OK);
184 let plc_token = get_plc_token_from_db(&did).await
185 .expect("PLC token not found in database");
186 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
187 unsafe {
188 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
189 }
190 let sign_res = client
191 .post(format!(
192 "{}/xrpc/com.atproto.identity.signPlcOperation",
193 base_url().await
194 ))
195 .bearer_auth(&token)
196 .json(&json!({
197 "token": plc_token
198 }))
199 .send()
200 .await
201 .expect("Sign request failed");
202 let sign_status = sign_res.status();
203 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
204 assert_eq!(
205 sign_status,
206 StatusCode::OK,
207 "Sign PLC operation should succeed. Response: {:?}",
208 sign_body
209 );
210 let operation = sign_body.get("operation")
211 .expect("Response should contain operation");
212 assert!(operation.get("sig").is_some(), "Operation should be signed");
213 assert_eq!(operation.get("type").and_then(|v| v.as_str()), Some("plc_operation"));
214 assert!(operation.get("prev").is_some(), "Operation should have prev reference");
215}
216#[tokio::test]
217#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_consumes_token -- --ignored --test-threads=1"]
218async fn test_sign_plc_operation_consumes_token() {
219 let client = client();
220 let (token, did) = create_account_and_login(&client).await;
221 let key_bytes = get_user_signing_key(&did).await
222 .expect("Failed to get user signing key");
223 let signing_key = SigningKey::from_slice(&key_bytes)
224 .expect("Failed to create signing key");
225 let handle = get_user_handle(&did).await
226 .expect("Failed to get user handle");
227 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
228 let pds_endpoint = format!("https://{}", hostname);
229 let request_res = client
230 .post(format!(
231 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
232 base_url().await
233 ))
234 .bearer_auth(&token)
235 .send()
236 .await
237 .expect("Request failed");
238 assert_eq!(request_res.status(), StatusCode::OK);
239 let plc_token = get_plc_token_from_db(&did).await
240 .expect("PLC token not found in database");
241 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
242 unsafe {
243 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
244 }
245 let sign_res = client
246 .post(format!(
247 "{}/xrpc/com.atproto.identity.signPlcOperation",
248 base_url().await
249 ))
250 .bearer_auth(&token)
251 .json(&json!({
252 "token": plc_token
253 }))
254 .send()
255 .await
256 .expect("Sign request failed");
257 assert_eq!(sign_res.status(), StatusCode::OK);
258 let sign_res_2 = client
259 .post(format!(
260 "{}/xrpc/com.atproto.identity.signPlcOperation",
261 base_url().await
262 ))
263 .bearer_auth(&token)
264 .json(&json!({
265 "token": plc_token
266 }))
267 .send()
268 .await
269 .expect("Second sign request failed");
270 assert_eq!(
271 sign_res_2.status(),
272 StatusCode::BAD_REQUEST,
273 "Using the same token twice should fail"
274 );
275 let body: Value = sign_res_2.json().await.unwrap();
276 assert!(
277 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken",
278 "Error should indicate invalid/expired token"
279 );
280}
281#[tokio::test]
282async fn test_sign_plc_operation_with_custom_fields() {
283 let client = client();
284 let (token, did) = create_account_and_login(&client).await;
285 let key_bytes = get_user_signing_key(&did).await
286 .expect("Failed to get user signing key");
287 let signing_key = SigningKey::from_slice(&key_bytes)
288 .expect("Failed to create signing key");
289 let handle = get_user_handle(&did).await
290 .expect("Failed to get user handle");
291 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
292 let pds_endpoint = format!("https://{}", hostname);
293 let request_res = client
294 .post(format!(
295 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
296 base_url().await
297 ))
298 .bearer_auth(&token)
299 .send()
300 .await
301 .expect("Request failed");
302 assert_eq!(request_res.status(), StatusCode::OK);
303 let plc_token = get_plc_token_from_db(&did).await
304 .expect("PLC token not found in database");
305 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
306 unsafe {
307 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
308 }
309 let did_key = signing_key_to_did_key(&signing_key);
310 let sign_res = client
311 .post(format!(
312 "{}/xrpc/com.atproto.identity.signPlcOperation",
313 base_url().await
314 ))
315 .bearer_auth(&token)
316 .json(&json!({
317 "token": plc_token,
318 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"],
319 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"]
320 }))
321 .send()
322 .await
323 .expect("Sign request failed");
324 let sign_status = sign_res.status();
325 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
326 assert_eq!(
327 sign_status,
328 StatusCode::OK,
329 "Sign with custom fields should succeed. Response: {:?}",
330 sign_body
331 );
332 let operation = sign_body.get("operation").expect("Should have operation");
333 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array());
334 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array());
335 assert!(also_known_as.is_some(), "Should have alsoKnownAs");
336 assert!(rotation_keys.is_some(), "Should have rotationKeys");
337 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases");
338 assert_eq!(rotation_keys.unwrap().len(), 2, "Should have 2 rotation keys");
339}
340#[tokio::test]
341#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"]
342async fn test_submit_plc_operation_success() {
343 let client = client();
344 let (token, did) = create_account_and_login(&client).await;
345 let key_bytes = get_user_signing_key(&did).await
346 .expect("Failed to get user signing key");
347 let signing_key = SigningKey::from_slice(&key_bytes)
348 .expect("Failed to create signing key");
349 let handle = get_user_handle(&did).await
350 .expect("Failed to get user handle");
351 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
352 let pds_endpoint = format!("https://{}", hostname);
353 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
354 unsafe {
355 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
356 }
357 let did_key = signing_key_to_did_key(&signing_key);
358 let operation = json!({
359 "type": "plc_operation",
360 "rotationKeys": [did_key.clone()],
361 "verificationMethods": {
362 "atproto": did_key.clone()
363 },
364 "alsoKnownAs": [format!("at://{}", handle)],
365 "services": {
366 "atproto_pds": {
367 "type": "AtprotoPersonalDataServer",
368 "endpoint": pds_endpoint
369 }
370 },
371 "prev": "bafyreiabc123",
372 "sig": "test_signature_base64"
373 });
374 let submit_res = client
375 .post(format!(
376 "{}/xrpc/com.atproto.identity.submitPlcOperation",
377 base_url().await
378 ))
379 .bearer_auth(&token)
380 .json(&json!({ "operation": operation }))
381 .send()
382 .await
383 .expect("Submit request failed");
384 let submit_status = submit_res.status();
385 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
386 assert_eq!(
387 submit_status,
388 StatusCode::OK,
389 "Submit PLC operation should succeed. Response: {:?}",
390 submit_body
391 );
392}
393#[tokio::test]
394async fn test_submit_plc_operation_wrong_endpoint_rejected() {
395 let client = client();
396 let (token, did) = create_account_and_login(&client).await;
397 let key_bytes = get_user_signing_key(&did).await
398 .expect("Failed to get user signing key");
399 let signing_key = SigningKey::from_slice(&key_bytes)
400 .expect("Failed to create signing key");
401 let handle = get_user_handle(&did).await
402 .expect("Failed to get user handle");
403 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
404 let pds_endpoint = format!("https://{}", hostname);
405 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
406 unsafe {
407 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
408 }
409 let did_key = signing_key_to_did_key(&signing_key);
410 let operation = json!({
411 "type": "plc_operation",
412 "rotationKeys": [did_key.clone()],
413 "verificationMethods": {
414 "atproto": did_key.clone()
415 },
416 "alsoKnownAs": [format!("at://{}", handle)],
417 "services": {
418 "atproto_pds": {
419 "type": "AtprotoPersonalDataServer",
420 "endpoint": "https://wrong-pds.example.com"
421 }
422 },
423 "prev": "bafyreiabc123",
424 "sig": "test_signature_base64"
425 });
426 let submit_res = client
427 .post(format!(
428 "{}/xrpc/com.atproto.identity.submitPlcOperation",
429 base_url().await
430 ))
431 .bearer_auth(&token)
432 .json(&json!({ "operation": operation }))
433 .send()
434 .await
435 .expect("Submit request failed");
436 assert_eq!(
437 submit_res.status(),
438 StatusCode::BAD_REQUEST,
439 "Submit with wrong endpoint should fail"
440 );
441 let body: Value = submit_res.json().await.unwrap();
442 assert_eq!(body["error"], "InvalidRequest");
443}
444#[tokio::test]
445async fn test_submit_plc_operation_wrong_signing_key_rejected() {
446 let client = client();
447 let (token, did) = create_account_and_login(&client).await;
448 let key_bytes = get_user_signing_key(&did).await
449 .expect("Failed to get user signing key");
450 let signing_key = SigningKey::from_slice(&key_bytes)
451 .expect("Failed to create signing key");
452 let handle = get_user_handle(&did).await
453 .expect("Failed to get user handle");
454 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
455 let pds_endpoint = format!("https://{}", hostname);
456 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
457 unsafe {
458 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
459 }
460 let wrong_key = SigningKey::random(&mut rand::thread_rng());
461 let wrong_did_key = signing_key_to_did_key(&wrong_key);
462 let correct_did_key = signing_key_to_did_key(&signing_key);
463 let operation = json!({
464 "type": "plc_operation",
465 "rotationKeys": [correct_did_key.clone()],
466 "verificationMethods": {
467 "atproto": wrong_did_key
468 },
469 "alsoKnownAs": [format!("at://{}", handle)],
470 "services": {
471 "atproto_pds": {
472 "type": "AtprotoPersonalDataServer",
473 "endpoint": pds_endpoint
474 }
475 },
476 "prev": "bafyreiabc123",
477 "sig": "test_signature_base64"
478 });
479 let submit_res = client
480 .post(format!(
481 "{}/xrpc/com.atproto.identity.submitPlcOperation",
482 base_url().await
483 ))
484 .bearer_auth(&token)
485 .json(&json!({ "operation": operation }))
486 .send()
487 .await
488 .expect("Submit request failed");
489 assert_eq!(
490 submit_res.status(),
491 StatusCode::BAD_REQUEST,
492 "Submit with wrong signing key should fail"
493 );
494 let body: Value = submit_res.json().await.unwrap();
495 assert_eq!(body["error"], "InvalidRequest");
496}
497#[tokio::test]
498async fn test_full_sign_and_submit_flow() {
499 let client = client();
500 let (token, did) = create_account_and_login(&client).await;
501 let key_bytes = get_user_signing_key(&did).await
502 .expect("Failed to get user signing key");
503 let signing_key = SigningKey::from_slice(&key_bytes)
504 .expect("Failed to create signing key");
505 let handle = get_user_handle(&did).await
506 .expect("Failed to get user handle");
507 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
508 let pds_endpoint = format!("https://{}", hostname);
509 let request_res = client
510 .post(format!(
511 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
512 base_url().await
513 ))
514 .bearer_auth(&token)
515 .send()
516 .await
517 .expect("Request failed");
518 assert_eq!(request_res.status(), StatusCode::OK);
519 let plc_token = get_plc_token_from_db(&did).await
520 .expect("PLC token not found");
521 let mock_server = MockServer::start().await;
522 let did_encoded = urlencoding::encode(&did);
523 let did_key = signing_key_to_did_key(&signing_key);
524 let last_op = json!({
525 "type": "plc_operation",
526 "rotationKeys": [did_key.clone()],
527 "verificationMethods": {
528 "atproto": did_key.clone()
529 },
530 "alsoKnownAs": [format!("at://{}", handle)],
531 "services": {
532 "atproto_pds": {
533 "type": "AtprotoPersonalDataServer",
534 "endpoint": pds_endpoint.clone()
535 }
536 },
537 "prev": null,
538 "sig": "initial_sig"
539 });
540 Mock::given(method("GET"))
541 .and(path(format!("/{}/log/last", did_encoded)))
542 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
543 .mount(&mock_server)
544 .await;
545 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
546 Mock::given(method("GET"))
547 .and(path(format!("/{}", did_encoded)))
548 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
549 .mount(&mock_server)
550 .await;
551 Mock::given(method("POST"))
552 .and(path(format!("/{}", did_encoded)))
553 .respond_with(ResponseTemplate::new(200))
554 .expect(1)
555 .mount(&mock_server)
556 .await;
557 unsafe {
558 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
559 }
560 let sign_res = client
561 .post(format!(
562 "{}/xrpc/com.atproto.identity.signPlcOperation",
563 base_url().await
564 ))
565 .bearer_auth(&token)
566 .json(&json!({ "token": plc_token }))
567 .send()
568 .await
569 .expect("Sign failed");
570 assert_eq!(sign_res.status(), StatusCode::OK);
571 let sign_body: Value = sign_res.json().await.unwrap();
572 let signed_operation = sign_body.get("operation")
573 .expect("Response should contain operation")
574 .clone();
575 assert!(signed_operation.get("sig").is_some());
576 assert!(signed_operation.get("prev").is_some());
577 let submit_res = client
578 .post(format!(
579 "{}/xrpc/com.atproto.identity.submitPlcOperation",
580 base_url().await
581 ))
582 .bearer_auth(&token)
583 .json(&json!({ "operation": signed_operation }))
584 .send()
585 .await
586 .expect("Submit failed");
587 let submit_status = submit_res.status();
588 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
589 assert_eq!(
590 submit_status,
591 StatusCode::OK,
592 "Full sign and submit flow should succeed. Response: {:?}",
593 submit_body
594 );
595}
596#[tokio::test]
597async fn test_cross_pds_migration_with_records() {
598 let client = client();
599 let (token, did) = create_account_and_login(&client).await;
600 let key_bytes = get_user_signing_key(&did).await
601 .expect("Failed to get user signing key");
602 let signing_key = SigningKey::from_slice(&key_bytes)
603 .expect("Failed to create signing key");
604 let handle = get_user_handle(&did).await
605 .expect("Failed to get user handle");
606 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
607 let pds_endpoint = format!("https://{}", hostname);
608 let post_payload = json!({
609 "repo": did,
610 "collection": "app.bsky.feed.post",
611 "record": {
612 "$type": "app.bsky.feed.post",
613 "text": "Test post before migration",
614 "createdAt": chrono::Utc::now().to_rfc3339(),
615 }
616 });
617 let create_res = client
618 .post(format!(
619 "{}/xrpc/com.atproto.repo.createRecord",
620 base_url().await
621 ))
622 .bearer_auth(&token)
623 .json(&post_payload)
624 .send()
625 .await
626 .expect("Failed to create post");
627 assert_eq!(create_res.status(), StatusCode::OK);
628 let create_body: Value = create_res.json().await.unwrap();
629 let original_uri = create_body["uri"].as_str().unwrap().to_string();
630 let export_res = client
631 .get(format!(
632 "{}/xrpc/com.atproto.sync.getRepo?did={}",
633 base_url().await,
634 did
635 ))
636 .send()
637 .await
638 .expect("Export failed");
639 assert_eq!(export_res.status(), StatusCode::OK);
640 let car_bytes = export_res.bytes().await.unwrap();
641 assert!(car_bytes.len() > 100, "CAR file should have meaningful content");
642 let mock_server = MockServer::start().await;
643 let did_encoded = urlencoding::encode(&did);
644 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
645 Mock::given(method("GET"))
646 .and(path(format!("/{}", did_encoded)))
647 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
648 .mount(&mock_server)
649 .await;
650 unsafe {
651 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
652 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
653 }
654 let import_res = client
655 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
656 .bearer_auth(&token)
657 .header("Content-Type", "application/vnd.ipld.car")
658 .body(car_bytes.to_vec())
659 .send()
660 .await
661 .expect("Import failed");
662 let import_status = import_res.status();
663 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
664 unsafe {
665 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
666 }
667 assert_eq!(
668 import_status,
669 StatusCode::OK,
670 "Import with valid DID document should succeed. Response: {:?}",
671 import_body
672 );
673 let get_record_res = client
674 .get(format!(
675 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}",
676 base_url().await,
677 did,
678 original_uri.split('/').last().unwrap()
679 ))
680 .send()
681 .await
682 .expect("Get record failed");
683 assert_eq!(
684 get_record_res.status(),
685 StatusCode::OK,
686 "Record should be retrievable after import"
687 );
688 let record_body: Value = get_record_res.json().await.unwrap();
689 assert_eq!(
690 record_body["value"]["text"],
691 "Test post before migration",
692 "Record content should match"
693 );
694}
695#[tokio::test]
696async fn test_migration_rejects_wrong_did_document() {
697 let client = client();
698 let (token, did) = create_account_and_login(&client).await;
699 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng());
700 let handle = get_user_handle(&did).await
701 .expect("Failed to get user handle");
702 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
703 let pds_endpoint = format!("https://{}", hostname);
704 let export_res = client
705 .get(format!(
706 "{}/xrpc/com.atproto.sync.getRepo?did={}",
707 base_url().await,
708 did
709 ))
710 .send()
711 .await
712 .expect("Export failed");
713 assert_eq!(export_res.status(), StatusCode::OK);
714 let car_bytes = export_res.bytes().await.unwrap();
715 let mock_server = MockServer::start().await;
716 let did_encoded = urlencoding::encode(&did);
717 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint);
718 Mock::given(method("GET"))
719 .and(path(format!("/{}", did_encoded)))
720 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc))
721 .mount(&mock_server)
722 .await;
723 unsafe {
724 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
725 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
726 }
727 let import_res = client
728 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
729 .bearer_auth(&token)
730 .header("Content-Type", "application/vnd.ipld.car")
731 .body(car_bytes.to_vec())
732 .send()
733 .await
734 .expect("Import failed");
735 let import_status = import_res.status();
736 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
737 unsafe {
738 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
739 }
740 assert_eq!(
741 import_status,
742 StatusCode::BAD_REQUEST,
743 "Import with wrong DID document should fail. Response: {:?}",
744 import_body
745 );
746 assert!(
747 import_body["error"] == "InvalidSignature" ||
748 import_body["message"].as_str().unwrap_or("").contains("signature"),
749 "Error should mention signature verification failure"
750 );
751}
752#[tokio::test]
753#[ignore = "requires exclusive env var access; run with: cargo test test_full_migration_flow_end_to_end -- --ignored --test-threads=1"]
754async fn test_full_migration_flow_end_to_end() {
755 let client = client();
756 let (token, did) = create_account_and_login(&client).await;
757 let key_bytes = get_user_signing_key(&did).await
758 .expect("Failed to get user signing key");
759 let signing_key = SigningKey::from_slice(&key_bytes)
760 .expect("Failed to create signing key");
761 let handle = get_user_handle(&did).await
762 .expect("Failed to get user handle");
763 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
764 let pds_endpoint = format!("https://{}", hostname);
765 let did_key = signing_key_to_did_key(&signing_key);
766 for i in 0..3 {
767 let post_payload = json!({
768 "repo": did,
769 "collection": "app.bsky.feed.post",
770 "record": {
771 "$type": "app.bsky.feed.post",
772 "text": format!("Pre-migration post #{}", i),
773 "createdAt": chrono::Utc::now().to_rfc3339(),
774 }
775 });
776 let res = client
777 .post(format!(
778 "{}/xrpc/com.atproto.repo.createRecord",
779 base_url().await
780 ))
781 .bearer_auth(&token)
782 .json(&post_payload)
783 .send()
784 .await
785 .expect("Failed to create post");
786 assert_eq!(res.status(), StatusCode::OK);
787 }
788 let request_res = client
789 .post(format!(
790 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
791 base_url().await
792 ))
793 .bearer_auth(&token)
794 .send()
795 .await
796 .expect("Request failed");
797 assert_eq!(request_res.status(), StatusCode::OK);
798 let plc_token = get_plc_token_from_db(&did).await
799 .expect("PLC token not found");
800 let mock_server = MockServer::start().await;
801 let did_encoded = urlencoding::encode(&did);
802 let last_op = json!({
803 "type": "plc_operation",
804 "rotationKeys": [did_key.clone()],
805 "verificationMethods": { "atproto": did_key.clone() },
806 "alsoKnownAs": [format!("at://{}", handle)],
807 "services": {
808 "atproto_pds": {
809 "type": "AtprotoPersonalDataServer",
810 "endpoint": pds_endpoint.clone()
811 }
812 },
813 "prev": null,
814 "sig": "initial_sig"
815 });
816 Mock::given(method("GET"))
817 .and(path(format!("/{}/log/last", did_encoded)))
818 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
819 .mount(&mock_server)
820 .await;
821 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
822 Mock::given(method("GET"))
823 .and(path(format!("/{}", did_encoded)))
824 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
825 .mount(&mock_server)
826 .await;
827 Mock::given(method("POST"))
828 .and(path(format!("/{}", did_encoded)))
829 .respond_with(ResponseTemplate::new(200))
830 .expect(1)
831 .mount(&mock_server)
832 .await;
833 unsafe {
834 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
835 }
836 let sign_res = client
837 .post(format!(
838 "{}/xrpc/com.atproto.identity.signPlcOperation",
839 base_url().await
840 ))
841 .bearer_auth(&token)
842 .json(&json!({ "token": plc_token }))
843 .send()
844 .await
845 .expect("Sign failed");
846 assert_eq!(sign_res.status(), StatusCode::OK);
847 let sign_body: Value = sign_res.json().await.unwrap();
848 let signed_op = sign_body.get("operation").unwrap().clone();
849 let export_res = client
850 .get(format!(
851 "{}/xrpc/com.atproto.sync.getRepo?did={}",
852 base_url().await,
853 did
854 ))
855 .send()
856 .await
857 .expect("Export failed");
858 assert_eq!(export_res.status(), StatusCode::OK);
859 let car_bytes = export_res.bytes().await.unwrap();
860 let submit_res = client
861 .post(format!(
862 "{}/xrpc/com.atproto.identity.submitPlcOperation",
863 base_url().await
864 ))
865 .bearer_auth(&token)
866 .json(&json!({ "operation": signed_op }))
867 .send()
868 .await
869 .expect("Submit failed");
870 assert_eq!(submit_res.status(), StatusCode::OK);
871 unsafe {
872 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
873 }
874 let import_res = client
875 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
876 .bearer_auth(&token)
877 .header("Content-Type", "application/vnd.ipld.car")
878 .body(car_bytes.to_vec())
879 .send()
880 .await
881 .expect("Import failed");
882 let import_status = import_res.status();
883 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
884 unsafe {
885 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
886 }
887 assert_eq!(
888 import_status,
889 StatusCode::OK,
890 "Full migration flow should succeed. Response: {:?}",
891 import_body
892 );
893 let list_res = client
894 .get(format!(
895 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post",
896 base_url().await,
897 did
898 ))
899 .send()
900 .await
901 .expect("List failed");
902 assert_eq!(list_res.status(), StatusCode::OK);
903 let list_body: Value = list_res.json().await.unwrap();
904 let records = list_body["records"].as_array()
905 .expect("Should have records array");
906 assert!(
907 records.len() >= 1,
908 "Should have at least 1 record after migration, found {}",
909 records.len()
910 );
911}