this repo has no description
1mod common;
2use common::*;
3
4use k256::ecdsa::SigningKey;
5use reqwest::StatusCode;
6use serde_json::{json, Value};
7use sqlx::PgPool;
8use wiremock::matchers::{method, path};
9use wiremock::{Mock, MockServer, ResponseTemplate};
10
11fn encode_uvarint(mut x: u64) -> Vec<u8> {
12 let mut out = Vec::new();
13 while x >= 0x80 {
14 out.push(((x as u8) & 0x7F) | 0x80);
15 x >>= 7;
16 }
17 out.push(x as u8);
18 out
19}
20
21fn signing_key_to_did_key(signing_key: &SigningKey) -> String {
22 let verifying_key = signing_key.verifying_key();
23 let point = verifying_key.to_encoded_point(true);
24 let compressed_bytes = point.as_bytes();
25
26 let mut prefixed = vec![0xe7, 0x01];
27 prefixed.extend_from_slice(compressed_bytes);
28
29 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed);
30 format!("did:key:{}", encoded)
31}
32
33fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String {
34 let public_key = signing_key.verifying_key();
35 let compressed = public_key.to_sec1_bytes();
36
37 let mut buf = encode_uvarint(0xE7);
38 buf.extend_from_slice(&compressed);
39 multibase::encode(multibase::Base::Base58Btc, buf)
40}
41
42async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> {
43 let db_url = get_db_connection_string().await;
44 let pool = PgPool::connect(&db_url).await.ok()?;
45
46 let row = sqlx::query!(
47 r#"
48 SELECT k.key_bytes, k.encryption_version
49 FROM user_keys k
50 JOIN users u ON k.user_id = u.id
51 WHERE u.did = $1
52 "#,
53 did
54 )
55 .fetch_optional(&pool)
56 .await
57 .ok()??;
58
59 bspds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok()
60}
61
62async fn get_plc_token_from_db(did: &str) -> Option<String> {
63 let db_url = get_db_connection_string().await;
64 let pool = PgPool::connect(&db_url).await.ok()?;
65
66 sqlx::query_scalar!(
67 r#"
68 SELECT t.token
69 FROM plc_operation_tokens t
70 JOIN users u ON t.user_id = u.id
71 WHERE u.did = $1
72 "#,
73 did
74 )
75 .fetch_optional(&pool)
76 .await
77 .ok()?
78}
79
80async fn get_user_handle(did: &str) -> Option<String> {
81 let db_url = get_db_connection_string().await;
82 let pool = PgPool::connect(&db_url).await.ok()?;
83
84 sqlx::query_scalar!(
85 r#"SELECT handle FROM users WHERE did = $1"#,
86 did
87 )
88 .fetch_optional(&pool)
89 .await
90 .ok()?
91}
92
93fn create_mock_last_op(
94 _did: &str,
95 handle: &str,
96 signing_key: &SigningKey,
97 pds_endpoint: &str,
98) -> Value {
99 let did_key = signing_key_to_did_key(signing_key);
100
101 json!({
102 "type": "plc_operation",
103 "rotationKeys": [did_key.clone()],
104 "verificationMethods": {
105 "atproto": did_key
106 },
107 "alsoKnownAs": [format!("at://{}", handle)],
108 "services": {
109 "atproto_pds": {
110 "type": "AtprotoPersonalDataServer",
111 "endpoint": pds_endpoint
112 }
113 },
114 "prev": null,
115 "sig": "mock_signature_for_testing"
116 })
117}
118
119fn create_did_document(did: &str, handle: &str, signing_key: &SigningKey, pds_endpoint: &str) -> Value {
120 let multikey = get_multikey_from_signing_key(signing_key);
121
122 json!({
123 "@context": [
124 "https://www.w3.org/ns/did/v1",
125 "https://w3id.org/security/multikey/v1"
126 ],
127 "id": did,
128 "alsoKnownAs": [format!("at://{}", handle)],
129 "verificationMethod": [{
130 "id": format!("{}#atproto", did),
131 "type": "Multikey",
132 "controller": did,
133 "publicKeyMultibase": multikey
134 }],
135 "service": [{
136 "id": "#atproto_pds",
137 "type": "AtprotoPersonalDataServer",
138 "serviceEndpoint": pds_endpoint
139 }]
140 })
141}
142
143async fn setup_mock_plc_for_sign(
144 did: &str,
145 handle: &str,
146 signing_key: &SigningKey,
147 pds_endpoint: &str,
148) -> MockServer {
149 let mock_server = MockServer::start().await;
150
151 let did_encoded = urlencoding::encode(did);
152 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint);
153
154 Mock::given(method("GET"))
155 .and(path(format!("/{}/log/last", did_encoded)))
156 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
157 .mount(&mock_server)
158 .await;
159
160 mock_server
161}
162
163async fn setup_mock_plc_for_submit(
164 did: &str,
165 handle: &str,
166 signing_key: &SigningKey,
167 pds_endpoint: &str,
168) -> MockServer {
169 let mock_server = MockServer::start().await;
170
171 let did_encoded = urlencoding::encode(did);
172 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint);
173
174 Mock::given(method("GET"))
175 .and(path(format!("/{}", did_encoded)))
176 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone()))
177 .mount(&mock_server)
178 .await;
179
180 Mock::given(method("POST"))
181 .and(path(format!("/{}", did_encoded)))
182 .respond_with(ResponseTemplate::new(200))
183 .mount(&mock_server)
184 .await;
185
186 mock_server
187}
188
189#[tokio::test]
190async fn test_full_plc_operation_flow() {
191 let client = client();
192 let (token, did) = create_account_and_login(&client).await;
193
194 let key_bytes = get_user_signing_key(&did).await
195 .expect("Failed to get user signing key");
196 let signing_key = SigningKey::from_slice(&key_bytes)
197 .expect("Failed to create signing key");
198
199 let handle = get_user_handle(&did).await
200 .expect("Failed to get user handle");
201
202 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
203 let pds_endpoint = format!("https://{}", hostname);
204
205 let request_res = client
206 .post(format!(
207 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
208 base_url().await
209 ))
210 .bearer_auth(&token)
211 .send()
212 .await
213 .expect("Request failed");
214
215 assert_eq!(request_res.status(), StatusCode::OK);
216
217 let plc_token = get_plc_token_from_db(&did).await
218 .expect("PLC token not found in database");
219
220 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
221
222 unsafe {
223 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
224 }
225
226 let sign_res = client
227 .post(format!(
228 "{}/xrpc/com.atproto.identity.signPlcOperation",
229 base_url().await
230 ))
231 .bearer_auth(&token)
232 .json(&json!({
233 "token": plc_token
234 }))
235 .send()
236 .await
237 .expect("Sign request failed");
238
239 let sign_status = sign_res.status();
240 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
241
242 assert_eq!(
243 sign_status,
244 StatusCode::OK,
245 "Sign PLC operation should succeed. Response: {:?}",
246 sign_body
247 );
248
249 let operation = sign_body.get("operation")
250 .expect("Response should contain operation");
251
252 assert!(operation.get("sig").is_some(), "Operation should be signed");
253 assert_eq!(operation.get("type").and_then(|v| v.as_str()), Some("plc_operation"));
254 assert!(operation.get("prev").is_some(), "Operation should have prev reference");
255}
256
257#[tokio::test]
258#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_consumes_token -- --ignored --test-threads=1"]
259async fn test_sign_plc_operation_consumes_token() {
260 let client = client();
261 let (token, did) = create_account_and_login(&client).await;
262
263 let key_bytes = get_user_signing_key(&did).await
264 .expect("Failed to get user signing key");
265 let signing_key = SigningKey::from_slice(&key_bytes)
266 .expect("Failed to create signing key");
267
268 let handle = get_user_handle(&did).await
269 .expect("Failed to get user handle");
270
271 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
272 let pds_endpoint = format!("https://{}", hostname);
273
274 let request_res = client
275 .post(format!(
276 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
277 base_url().await
278 ))
279 .bearer_auth(&token)
280 .send()
281 .await
282 .expect("Request failed");
283
284 assert_eq!(request_res.status(), StatusCode::OK);
285
286 let plc_token = get_plc_token_from_db(&did).await
287 .expect("PLC token not found in database");
288
289 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
290
291 unsafe {
292 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
293 }
294
295 let sign_res = client
296 .post(format!(
297 "{}/xrpc/com.atproto.identity.signPlcOperation",
298 base_url().await
299 ))
300 .bearer_auth(&token)
301 .json(&json!({
302 "token": plc_token
303 }))
304 .send()
305 .await
306 .expect("Sign request failed");
307
308 assert_eq!(sign_res.status(), StatusCode::OK);
309
310 let sign_res_2 = client
311 .post(format!(
312 "{}/xrpc/com.atproto.identity.signPlcOperation",
313 base_url().await
314 ))
315 .bearer_auth(&token)
316 .json(&json!({
317 "token": plc_token
318 }))
319 .send()
320 .await
321 .expect("Second sign request failed");
322
323 assert_eq!(
324 sign_res_2.status(),
325 StatusCode::BAD_REQUEST,
326 "Using the same token twice should fail"
327 );
328
329 let body: Value = sign_res_2.json().await.unwrap();
330 assert!(
331 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken",
332 "Error should indicate invalid/expired token"
333 );
334}
335
336#[tokio::test]
337async fn test_sign_plc_operation_with_custom_fields() {
338 let client = client();
339 let (token, did) = create_account_and_login(&client).await;
340
341 let key_bytes = get_user_signing_key(&did).await
342 .expect("Failed to get user signing key");
343 let signing_key = SigningKey::from_slice(&key_bytes)
344 .expect("Failed to create signing key");
345
346 let handle = get_user_handle(&did).await
347 .expect("Failed to get user handle");
348
349 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
350 let pds_endpoint = format!("https://{}", hostname);
351
352 let request_res = client
353 .post(format!(
354 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
355 base_url().await
356 ))
357 .bearer_auth(&token)
358 .send()
359 .await
360 .expect("Request failed");
361
362 assert_eq!(request_res.status(), StatusCode::OK);
363
364 let plc_token = get_plc_token_from_db(&did).await
365 .expect("PLC token not found in database");
366
367 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
368
369 unsafe {
370 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
371 }
372
373 let did_key = signing_key_to_did_key(&signing_key);
374
375 let sign_res = client
376 .post(format!(
377 "{}/xrpc/com.atproto.identity.signPlcOperation",
378 base_url().await
379 ))
380 .bearer_auth(&token)
381 .json(&json!({
382 "token": plc_token,
383 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"],
384 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"]
385 }))
386 .send()
387 .await
388 .expect("Sign request failed");
389
390 let sign_status = sign_res.status();
391 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
392
393 assert_eq!(
394 sign_status,
395 StatusCode::OK,
396 "Sign with custom fields should succeed. Response: {:?}",
397 sign_body
398 );
399
400 let operation = sign_body.get("operation").expect("Should have operation");
401 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array());
402 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array());
403
404 assert!(also_known_as.is_some(), "Should have alsoKnownAs");
405 assert!(rotation_keys.is_some(), "Should have rotationKeys");
406 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases");
407 assert_eq!(rotation_keys.unwrap().len(), 2, "Should have 2 rotation keys");
408}
409
410#[tokio::test]
411async fn test_submit_plc_operation_success() {
412 let client = client();
413 let (token, did) = create_account_and_login(&client).await;
414
415 let key_bytes = get_user_signing_key(&did).await
416 .expect("Failed to get user signing key");
417 let signing_key = SigningKey::from_slice(&key_bytes)
418 .expect("Failed to create signing key");
419
420 let handle = get_user_handle(&did).await
421 .expect("Failed to get user handle");
422
423 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
424 let pds_endpoint = format!("https://{}", hostname);
425
426 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
427
428 unsafe {
429 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
430 }
431
432 let did_key = signing_key_to_did_key(&signing_key);
433
434 let operation = json!({
435 "type": "plc_operation",
436 "rotationKeys": [did_key.clone()],
437 "verificationMethods": {
438 "atproto": did_key.clone()
439 },
440 "alsoKnownAs": [format!("at://{}", handle)],
441 "services": {
442 "atproto_pds": {
443 "type": "AtprotoPersonalDataServer",
444 "endpoint": pds_endpoint
445 }
446 },
447 "prev": "bafyreiabc123",
448 "sig": "test_signature_base64"
449 });
450
451 let submit_res = client
452 .post(format!(
453 "{}/xrpc/com.atproto.identity.submitPlcOperation",
454 base_url().await
455 ))
456 .bearer_auth(&token)
457 .json(&json!({ "operation": operation }))
458 .send()
459 .await
460 .expect("Submit request failed");
461
462 let submit_status = submit_res.status();
463 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
464
465 assert_eq!(
466 submit_status,
467 StatusCode::OK,
468 "Submit PLC operation should succeed. Response: {:?}",
469 submit_body
470 );
471}
472
473#[tokio::test]
474async fn test_submit_plc_operation_wrong_endpoint_rejected() {
475 let client = client();
476 let (token, did) = create_account_and_login(&client).await;
477
478 let key_bytes = get_user_signing_key(&did).await
479 .expect("Failed to get user signing key");
480 let signing_key = SigningKey::from_slice(&key_bytes)
481 .expect("Failed to create signing key");
482
483 let handle = get_user_handle(&did).await
484 .expect("Failed to get user handle");
485
486 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
487 let pds_endpoint = format!("https://{}", hostname);
488
489 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
490
491 unsafe {
492 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
493 }
494
495 let did_key = signing_key_to_did_key(&signing_key);
496
497 let operation = json!({
498 "type": "plc_operation",
499 "rotationKeys": [did_key.clone()],
500 "verificationMethods": {
501 "atproto": did_key.clone()
502 },
503 "alsoKnownAs": [format!("at://{}", handle)],
504 "services": {
505 "atproto_pds": {
506 "type": "AtprotoPersonalDataServer",
507 "endpoint": "https://wrong-pds.example.com"
508 }
509 },
510 "prev": "bafyreiabc123",
511 "sig": "test_signature_base64"
512 });
513
514 let submit_res = client
515 .post(format!(
516 "{}/xrpc/com.atproto.identity.submitPlcOperation",
517 base_url().await
518 ))
519 .bearer_auth(&token)
520 .json(&json!({ "operation": operation }))
521 .send()
522 .await
523 .expect("Submit request failed");
524
525 assert_eq!(
526 submit_res.status(),
527 StatusCode::BAD_REQUEST,
528 "Submit with wrong endpoint should fail"
529 );
530
531 let body: Value = submit_res.json().await.unwrap();
532 assert_eq!(body["error"], "InvalidRequest");
533}
534
535#[tokio::test]
536async fn test_submit_plc_operation_wrong_signing_key_rejected() {
537 let client = client();
538 let (token, did) = create_account_and_login(&client).await;
539
540 let key_bytes = get_user_signing_key(&did).await
541 .expect("Failed to get user signing key");
542 let signing_key = SigningKey::from_slice(&key_bytes)
543 .expect("Failed to create signing key");
544
545 let handle = get_user_handle(&did).await
546 .expect("Failed to get user handle");
547
548 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
549 let pds_endpoint = format!("https://{}", hostname);
550
551 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
552
553 unsafe {
554 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
555 }
556
557 let wrong_key = SigningKey::random(&mut rand::thread_rng());
558 let wrong_did_key = signing_key_to_did_key(&wrong_key);
559 let correct_did_key = signing_key_to_did_key(&signing_key);
560
561 let operation = json!({
562 "type": "plc_operation",
563 "rotationKeys": [correct_did_key.clone()],
564 "verificationMethods": {
565 "atproto": wrong_did_key
566 },
567 "alsoKnownAs": [format!("at://{}", handle)],
568 "services": {
569 "atproto_pds": {
570 "type": "AtprotoPersonalDataServer",
571 "endpoint": pds_endpoint
572 }
573 },
574 "prev": "bafyreiabc123",
575 "sig": "test_signature_base64"
576 });
577
578 let submit_res = client
579 .post(format!(
580 "{}/xrpc/com.atproto.identity.submitPlcOperation",
581 base_url().await
582 ))
583 .bearer_auth(&token)
584 .json(&json!({ "operation": operation }))
585 .send()
586 .await
587 .expect("Submit request failed");
588
589 assert_eq!(
590 submit_res.status(),
591 StatusCode::BAD_REQUEST,
592 "Submit with wrong signing key should fail"
593 );
594
595 let body: Value = submit_res.json().await.unwrap();
596 assert_eq!(body["error"], "InvalidRequest");
597}
598
599#[tokio::test]
600async fn test_full_sign_and_submit_flow() {
601 let client = client();
602 let (token, did) = create_account_and_login(&client).await;
603
604 let key_bytes = get_user_signing_key(&did).await
605 .expect("Failed to get user signing key");
606 let signing_key = SigningKey::from_slice(&key_bytes)
607 .expect("Failed to create signing key");
608
609 let handle = get_user_handle(&did).await
610 .expect("Failed to get user handle");
611
612 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
613 let pds_endpoint = format!("https://{}", hostname);
614
615 let request_res = client
616 .post(format!(
617 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
618 base_url().await
619 ))
620 .bearer_auth(&token)
621 .send()
622 .await
623 .expect("Request failed");
624 assert_eq!(request_res.status(), StatusCode::OK);
625
626 let plc_token = get_plc_token_from_db(&did).await
627 .expect("PLC token not found");
628
629 let mock_server = MockServer::start().await;
630 let did_encoded = urlencoding::encode(&did);
631 let did_key = signing_key_to_did_key(&signing_key);
632
633 let last_op = json!({
634 "type": "plc_operation",
635 "rotationKeys": [did_key.clone()],
636 "verificationMethods": {
637 "atproto": did_key.clone()
638 },
639 "alsoKnownAs": [format!("at://{}", handle)],
640 "services": {
641 "atproto_pds": {
642 "type": "AtprotoPersonalDataServer",
643 "endpoint": pds_endpoint.clone()
644 }
645 },
646 "prev": null,
647 "sig": "initial_sig"
648 });
649
650 Mock::given(method("GET"))
651 .and(path(format!("/{}/log/last", did_encoded)))
652 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
653 .mount(&mock_server)
654 .await;
655
656 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
657 Mock::given(method("GET"))
658 .and(path(format!("/{}", did_encoded)))
659 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
660 .mount(&mock_server)
661 .await;
662
663 Mock::given(method("POST"))
664 .and(path(format!("/{}", did_encoded)))
665 .respond_with(ResponseTemplate::new(200))
666 .expect(1)
667 .mount(&mock_server)
668 .await;
669
670 unsafe {
671 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
672 }
673
674 let sign_res = client
675 .post(format!(
676 "{}/xrpc/com.atproto.identity.signPlcOperation",
677 base_url().await
678 ))
679 .bearer_auth(&token)
680 .json(&json!({ "token": plc_token }))
681 .send()
682 .await
683 .expect("Sign failed");
684
685 assert_eq!(sign_res.status(), StatusCode::OK);
686
687 let sign_body: Value = sign_res.json().await.unwrap();
688 let signed_operation = sign_body.get("operation")
689 .expect("Response should contain operation")
690 .clone();
691
692 assert!(signed_operation.get("sig").is_some());
693 assert!(signed_operation.get("prev").is_some());
694
695 let submit_res = client
696 .post(format!(
697 "{}/xrpc/com.atproto.identity.submitPlcOperation",
698 base_url().await
699 ))
700 .bearer_auth(&token)
701 .json(&json!({ "operation": signed_operation }))
702 .send()
703 .await
704 .expect("Submit failed");
705
706 let submit_status = submit_res.status();
707 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
708
709 assert_eq!(
710 submit_status,
711 StatusCode::OK,
712 "Full sign and submit flow should succeed. Response: {:?}",
713 submit_body
714 );
715}
716
717#[tokio::test]
718async fn test_cross_pds_migration_with_records() {
719 let client = client();
720 let (token, did) = create_account_and_login(&client).await;
721
722 let key_bytes = get_user_signing_key(&did).await
723 .expect("Failed to get user signing key");
724 let signing_key = SigningKey::from_slice(&key_bytes)
725 .expect("Failed to create signing key");
726
727 let handle = get_user_handle(&did).await
728 .expect("Failed to get user handle");
729
730 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
731 let pds_endpoint = format!("https://{}", hostname);
732
733 let post_payload = json!({
734 "repo": did,
735 "collection": "app.bsky.feed.post",
736 "record": {
737 "$type": "app.bsky.feed.post",
738 "text": "Test post before migration",
739 "createdAt": chrono::Utc::now().to_rfc3339(),
740 }
741 });
742
743 let create_res = client
744 .post(format!(
745 "{}/xrpc/com.atproto.repo.createRecord",
746 base_url().await
747 ))
748 .bearer_auth(&token)
749 .json(&post_payload)
750 .send()
751 .await
752 .expect("Failed to create post");
753 assert_eq!(create_res.status(), StatusCode::OK);
754
755 let create_body: Value = create_res.json().await.unwrap();
756 let original_uri = create_body["uri"].as_str().unwrap().to_string();
757
758 let export_res = client
759 .get(format!(
760 "{}/xrpc/com.atproto.sync.getRepo?did={}",
761 base_url().await,
762 did
763 ))
764 .send()
765 .await
766 .expect("Export failed");
767 assert_eq!(export_res.status(), StatusCode::OK);
768 let car_bytes = export_res.bytes().await.unwrap();
769
770 assert!(car_bytes.len() > 100, "CAR file should have meaningful content");
771
772 let mock_server = MockServer::start().await;
773 let did_encoded = urlencoding::encode(&did);
774 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
775
776 Mock::given(method("GET"))
777 .and(path(format!("/{}", did_encoded)))
778 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
779 .mount(&mock_server)
780 .await;
781
782 unsafe {
783 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
784 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
785 }
786
787 let import_res = client
788 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
789 .bearer_auth(&token)
790 .header("Content-Type", "application/vnd.ipld.car")
791 .body(car_bytes.to_vec())
792 .send()
793 .await
794 .expect("Import failed");
795
796 let import_status = import_res.status();
797 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
798
799 unsafe {
800 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
801 }
802
803 assert_eq!(
804 import_status,
805 StatusCode::OK,
806 "Import with valid DID document should succeed. Response: {:?}",
807 import_body
808 );
809
810 let get_record_res = client
811 .get(format!(
812 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}",
813 base_url().await,
814 did,
815 original_uri.split('/').last().unwrap()
816 ))
817 .send()
818 .await
819 .expect("Get record failed");
820
821 assert_eq!(
822 get_record_res.status(),
823 StatusCode::OK,
824 "Record should be retrievable after import"
825 );
826
827 let record_body: Value = get_record_res.json().await.unwrap();
828 assert_eq!(
829 record_body["value"]["text"],
830 "Test post before migration",
831 "Record content should match"
832 );
833}
834
835#[tokio::test]
836async fn test_migration_rejects_wrong_did_document() {
837 let client = client();
838 let (token, did) = create_account_and_login(&client).await;
839
840 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng());
841
842 let handle = get_user_handle(&did).await
843 .expect("Failed to get user handle");
844
845 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
846 let pds_endpoint = format!("https://{}", hostname);
847
848 let export_res = client
849 .get(format!(
850 "{}/xrpc/com.atproto.sync.getRepo?did={}",
851 base_url().await,
852 did
853 ))
854 .send()
855 .await
856 .expect("Export failed");
857 assert_eq!(export_res.status(), StatusCode::OK);
858 let car_bytes = export_res.bytes().await.unwrap();
859
860 let mock_server = MockServer::start().await;
861 let did_encoded = urlencoding::encode(&did);
862 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint);
863
864 Mock::given(method("GET"))
865 .and(path(format!("/{}", did_encoded)))
866 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc))
867 .mount(&mock_server)
868 .await;
869
870 unsafe {
871 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
872 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
873 }
874
875 let import_res = client
876 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
877 .bearer_auth(&token)
878 .header("Content-Type", "application/vnd.ipld.car")
879 .body(car_bytes.to_vec())
880 .send()
881 .await
882 .expect("Import failed");
883
884 let import_status = import_res.status();
885 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
886
887 unsafe {
888 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
889 }
890
891 assert_eq!(
892 import_status,
893 StatusCode::BAD_REQUEST,
894 "Import with wrong DID document should fail. Response: {:?}",
895 import_body
896 );
897
898 assert!(
899 import_body["error"] == "InvalidSignature" ||
900 import_body["message"].as_str().unwrap_or("").contains("signature"),
901 "Error should mention signature verification failure"
902 );
903}
904
905#[tokio::test]
906#[ignore = "requires exclusive env var access; run with: cargo test test_full_migration_flow_end_to_end -- --ignored --test-threads=1"]
907async fn test_full_migration_flow_end_to_end() {
908 let client = client();
909 let (token, did) = create_account_and_login(&client).await;
910
911 let key_bytes = get_user_signing_key(&did).await
912 .expect("Failed to get user signing key");
913 let signing_key = SigningKey::from_slice(&key_bytes)
914 .expect("Failed to create signing key");
915
916 let handle = get_user_handle(&did).await
917 .expect("Failed to get user handle");
918
919 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
920 let pds_endpoint = format!("https://{}", hostname);
921 let did_key = signing_key_to_did_key(&signing_key);
922
923 for i in 0..3 {
924 let post_payload = json!({
925 "repo": did,
926 "collection": "app.bsky.feed.post",
927 "record": {
928 "$type": "app.bsky.feed.post",
929 "text": format!("Pre-migration post #{}", i),
930 "createdAt": chrono::Utc::now().to_rfc3339(),
931 }
932 });
933
934 let res = client
935 .post(format!(
936 "{}/xrpc/com.atproto.repo.createRecord",
937 base_url().await
938 ))
939 .bearer_auth(&token)
940 .json(&post_payload)
941 .send()
942 .await
943 .expect("Failed to create post");
944 assert_eq!(res.status(), StatusCode::OK);
945 }
946
947 let request_res = client
948 .post(format!(
949 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
950 base_url().await
951 ))
952 .bearer_auth(&token)
953 .send()
954 .await
955 .expect("Request failed");
956 assert_eq!(request_res.status(), StatusCode::OK);
957
958 let plc_token = get_plc_token_from_db(&did).await
959 .expect("PLC token not found");
960
961 let mock_server = MockServer::start().await;
962 let did_encoded = urlencoding::encode(&did);
963
964 let last_op = json!({
965 "type": "plc_operation",
966 "rotationKeys": [did_key.clone()],
967 "verificationMethods": { "atproto": did_key.clone() },
968 "alsoKnownAs": [format!("at://{}", handle)],
969 "services": {
970 "atproto_pds": {
971 "type": "AtprotoPersonalDataServer",
972 "endpoint": pds_endpoint.clone()
973 }
974 },
975 "prev": null,
976 "sig": "initial_sig"
977 });
978
979 Mock::given(method("GET"))
980 .and(path(format!("/{}/log/last", did_encoded)))
981 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
982 .mount(&mock_server)
983 .await;
984
985 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
986 Mock::given(method("GET"))
987 .and(path(format!("/{}", did_encoded)))
988 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
989 .mount(&mock_server)
990 .await;
991
992 Mock::given(method("POST"))
993 .and(path(format!("/{}", did_encoded)))
994 .respond_with(ResponseTemplate::new(200))
995 .expect(1)
996 .mount(&mock_server)
997 .await;
998
999 unsafe {
1000 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
1001 }
1002
1003 let sign_res = client
1004 .post(format!(
1005 "{}/xrpc/com.atproto.identity.signPlcOperation",
1006 base_url().await
1007 ))
1008 .bearer_auth(&token)
1009 .json(&json!({ "token": plc_token }))
1010 .send()
1011 .await
1012 .expect("Sign failed");
1013 assert_eq!(sign_res.status(), StatusCode::OK);
1014
1015 let sign_body: Value = sign_res.json().await.unwrap();
1016 let signed_op = sign_body.get("operation").unwrap().clone();
1017
1018 let export_res = client
1019 .get(format!(
1020 "{}/xrpc/com.atproto.sync.getRepo?did={}",
1021 base_url().await,
1022 did
1023 ))
1024 .send()
1025 .await
1026 .expect("Export failed");
1027 assert_eq!(export_res.status(), StatusCode::OK);
1028 let car_bytes = export_res.bytes().await.unwrap();
1029
1030 let submit_res = client
1031 .post(format!(
1032 "{}/xrpc/com.atproto.identity.submitPlcOperation",
1033 base_url().await
1034 ))
1035 .bearer_auth(&token)
1036 .json(&json!({ "operation": signed_op }))
1037 .send()
1038 .await
1039 .expect("Submit failed");
1040 assert_eq!(submit_res.status(), StatusCode::OK);
1041
1042 unsafe {
1043 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
1044 }
1045
1046 let import_res = client
1047 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
1048 .bearer_auth(&token)
1049 .header("Content-Type", "application/vnd.ipld.car")
1050 .body(car_bytes.to_vec())
1051 .send()
1052 .await
1053 .expect("Import failed");
1054
1055 let import_status = import_res.status();
1056 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
1057
1058 unsafe {
1059 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
1060 }
1061
1062 assert_eq!(
1063 import_status,
1064 StatusCode::OK,
1065 "Full migration flow should succeed. Response: {:?}",
1066 import_body
1067 );
1068
1069 let list_res = client
1070 .get(format!(
1071 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post",
1072 base_url().await,
1073 did
1074 ))
1075 .send()
1076 .await
1077 .expect("List failed");
1078 assert_eq!(list_res.status(), StatusCode::OK);
1079
1080 let list_body: Value = list_res.json().await.unwrap();
1081 let records = list_body["records"].as_array()
1082 .expect("Should have records array");
1083
1084 assert!(
1085 records.len() >= 1,
1086 "Should have at least 1 record after migration, found {}",
1087 records.len()
1088 );
1089}