this repo has no description
1mod common;
2use common::*;
3
4use k256::ecdsa::SigningKey;
5use reqwest::StatusCode;
6use serde_json::{json, Value};
7use sqlx::PgPool;
8use wiremock::matchers::{method, path};
9use wiremock::{Mock, MockServer, ResponseTemplate};
10
11fn encode_uvarint(mut x: u64) -> Vec<u8> {
12 let mut out = Vec::new();
13 while x >= 0x80 {
14 out.push(((x as u8) & 0x7F) | 0x80);
15 x >>= 7;
16 }
17 out.push(x as u8);
18 out
19}
20
21fn signing_key_to_did_key(signing_key: &SigningKey) -> String {
22 let verifying_key = signing_key.verifying_key();
23 let point = verifying_key.to_encoded_point(true);
24 let compressed_bytes = point.as_bytes();
25
26 let mut prefixed = vec![0xe7, 0x01];
27 prefixed.extend_from_slice(compressed_bytes);
28
29 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed);
30 format!("did:key:{}", encoded)
31}
32
33fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String {
34 let public_key = signing_key.verifying_key();
35 let compressed = public_key.to_sec1_bytes();
36
37 let mut buf = encode_uvarint(0xE7);
38 buf.extend_from_slice(&compressed);
39 multibase::encode(multibase::Base::Base58Btc, buf)
40}
41
42async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> {
43 let db_url = get_db_connection_string().await;
44 let pool = PgPool::connect(&db_url).await.ok()?;
45
46 let row = sqlx::query!(
47 r#"
48 SELECT k.key_bytes, k.encryption_version
49 FROM user_keys k
50 JOIN users u ON k.user_id = u.id
51 WHERE u.did = $1
52 "#,
53 did
54 )
55 .fetch_optional(&pool)
56 .await
57 .ok()??;
58
59 bspds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok()
60}
61
62async fn get_plc_token_from_db(did: &str) -> Option<String> {
63 let db_url = get_db_connection_string().await;
64 let pool = PgPool::connect(&db_url).await.ok()?;
65
66 sqlx::query_scalar!(
67 r#"
68 SELECT t.token
69 FROM plc_operation_tokens t
70 JOIN users u ON t.user_id = u.id
71 WHERE u.did = $1
72 "#,
73 did
74 )
75 .fetch_optional(&pool)
76 .await
77 .ok()?
78}
79
80async fn get_user_handle(did: &str) -> Option<String> {
81 let db_url = get_db_connection_string().await;
82 let pool = PgPool::connect(&db_url).await.ok()?;
83
84 sqlx::query_scalar!(
85 r#"SELECT handle FROM users WHERE did = $1"#,
86 did
87 )
88 .fetch_optional(&pool)
89 .await
90 .ok()?
91}
92
93fn create_mock_last_op(
94 _did: &str,
95 handle: &str,
96 signing_key: &SigningKey,
97 pds_endpoint: &str,
98) -> Value {
99 let did_key = signing_key_to_did_key(signing_key);
100
101 json!({
102 "type": "plc_operation",
103 "rotationKeys": [did_key.clone()],
104 "verificationMethods": {
105 "atproto": did_key
106 },
107 "alsoKnownAs": [format!("at://{}", handle)],
108 "services": {
109 "atproto_pds": {
110 "type": "AtprotoPersonalDataServer",
111 "endpoint": pds_endpoint
112 }
113 },
114 "prev": null,
115 "sig": "mock_signature_for_testing"
116 })
117}
118
119fn create_did_document(did: &str, handle: &str, signing_key: &SigningKey, pds_endpoint: &str) -> Value {
120 let multikey = get_multikey_from_signing_key(signing_key);
121
122 json!({
123 "@context": [
124 "https://www.w3.org/ns/did/v1",
125 "https://w3id.org/security/multikey/v1"
126 ],
127 "id": did,
128 "alsoKnownAs": [format!("at://{}", handle)],
129 "verificationMethod": [{
130 "id": format!("{}#atproto", did),
131 "type": "Multikey",
132 "controller": did,
133 "publicKeyMultibase": multikey
134 }],
135 "service": [{
136 "id": "#atproto_pds",
137 "type": "AtprotoPersonalDataServer",
138 "serviceEndpoint": pds_endpoint
139 }]
140 })
141}
142
143async fn setup_mock_plc_for_sign(
144 did: &str,
145 handle: &str,
146 signing_key: &SigningKey,
147 pds_endpoint: &str,
148) -> MockServer {
149 let mock_server = MockServer::start().await;
150
151 let did_encoded = urlencoding::encode(did);
152 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint);
153
154 Mock::given(method("GET"))
155 .and(path(format!("/{}/log/last", did_encoded)))
156 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
157 .mount(&mock_server)
158 .await;
159
160 mock_server
161}
162
163async fn setup_mock_plc_for_submit(
164 did: &str,
165 handle: &str,
166 signing_key: &SigningKey,
167 pds_endpoint: &str,
168) -> MockServer {
169 let mock_server = MockServer::start().await;
170
171 let did_encoded = urlencoding::encode(did);
172 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint);
173
174 Mock::given(method("GET"))
175 .and(path(format!("/{}", did_encoded)))
176 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone()))
177 .mount(&mock_server)
178 .await;
179
180 Mock::given(method("POST"))
181 .and(path(format!("/{}", did_encoded)))
182 .respond_with(ResponseTemplate::new(200))
183 .mount(&mock_server)
184 .await;
185
186 mock_server
187}
188
189#[tokio::test]
190async fn test_full_plc_operation_flow() {
191 let client = client();
192 let (token, did) = create_account_and_login(&client).await;
193
194 let key_bytes = get_user_signing_key(&did).await
195 .expect("Failed to get user signing key");
196 let signing_key = SigningKey::from_slice(&key_bytes)
197 .expect("Failed to create signing key");
198
199 let handle = get_user_handle(&did).await
200 .expect("Failed to get user handle");
201
202 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
203 let pds_endpoint = format!("https://{}", hostname);
204
205 let request_res = client
206 .post(format!(
207 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
208 base_url().await
209 ))
210 .bearer_auth(&token)
211 .send()
212 .await
213 .expect("Request failed");
214
215 assert_eq!(request_res.status(), StatusCode::OK);
216
217 let plc_token = get_plc_token_from_db(&did).await
218 .expect("PLC token not found in database");
219
220 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
221
222 unsafe {
223 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
224 }
225
226 let sign_res = client
227 .post(format!(
228 "{}/xrpc/com.atproto.identity.signPlcOperation",
229 base_url().await
230 ))
231 .bearer_auth(&token)
232 .json(&json!({
233 "token": plc_token
234 }))
235 .send()
236 .await
237 .expect("Sign request failed");
238
239 let sign_status = sign_res.status();
240 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
241
242 assert_eq!(
243 sign_status,
244 StatusCode::OK,
245 "Sign PLC operation should succeed. Response: {:?}",
246 sign_body
247 );
248
249 let operation = sign_body.get("operation")
250 .expect("Response should contain operation");
251
252 assert!(operation.get("sig").is_some(), "Operation should be signed");
253 assert_eq!(operation.get("type").and_then(|v| v.as_str()), Some("plc_operation"));
254 assert!(operation.get("prev").is_some(), "Operation should have prev reference");
255}
256
257#[tokio::test]
258async fn test_sign_plc_operation_consumes_token() {
259 let client = client();
260 let (token, did) = create_account_and_login(&client).await;
261
262 let key_bytes = get_user_signing_key(&did).await
263 .expect("Failed to get user signing key");
264 let signing_key = SigningKey::from_slice(&key_bytes)
265 .expect("Failed to create signing key");
266
267 let handle = get_user_handle(&did).await
268 .expect("Failed to get user handle");
269
270 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
271 let pds_endpoint = format!("https://{}", hostname);
272
273 let request_res = client
274 .post(format!(
275 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
276 base_url().await
277 ))
278 .bearer_auth(&token)
279 .send()
280 .await
281 .expect("Request failed");
282
283 assert_eq!(request_res.status(), StatusCode::OK);
284
285 let plc_token = get_plc_token_from_db(&did).await
286 .expect("PLC token not found in database");
287
288 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
289
290 unsafe {
291 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
292 }
293
294 let sign_res = client
295 .post(format!(
296 "{}/xrpc/com.atproto.identity.signPlcOperation",
297 base_url().await
298 ))
299 .bearer_auth(&token)
300 .json(&json!({
301 "token": plc_token
302 }))
303 .send()
304 .await
305 .expect("Sign request failed");
306
307 assert_eq!(sign_res.status(), StatusCode::OK);
308
309 let sign_res_2 = client
310 .post(format!(
311 "{}/xrpc/com.atproto.identity.signPlcOperation",
312 base_url().await
313 ))
314 .bearer_auth(&token)
315 .json(&json!({
316 "token": plc_token
317 }))
318 .send()
319 .await
320 .expect("Second sign request failed");
321
322 assert_eq!(
323 sign_res_2.status(),
324 StatusCode::BAD_REQUEST,
325 "Using the same token twice should fail"
326 );
327
328 let body: Value = sign_res_2.json().await.unwrap();
329 assert!(
330 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken",
331 "Error should indicate invalid/expired token"
332 );
333}
334
335#[tokio::test]
336async fn test_sign_plc_operation_with_custom_fields() {
337 let client = client();
338 let (token, did) = create_account_and_login(&client).await;
339
340 let key_bytes = get_user_signing_key(&did).await
341 .expect("Failed to get user signing key");
342 let signing_key = SigningKey::from_slice(&key_bytes)
343 .expect("Failed to create signing key");
344
345 let handle = get_user_handle(&did).await
346 .expect("Failed to get user handle");
347
348 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
349 let pds_endpoint = format!("https://{}", hostname);
350
351 let request_res = client
352 .post(format!(
353 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
354 base_url().await
355 ))
356 .bearer_auth(&token)
357 .send()
358 .await
359 .expect("Request failed");
360
361 assert_eq!(request_res.status(), StatusCode::OK);
362
363 let plc_token = get_plc_token_from_db(&did).await
364 .expect("PLC token not found in database");
365
366 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
367
368 unsafe {
369 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
370 }
371
372 let did_key = signing_key_to_did_key(&signing_key);
373
374 let sign_res = client
375 .post(format!(
376 "{}/xrpc/com.atproto.identity.signPlcOperation",
377 base_url().await
378 ))
379 .bearer_auth(&token)
380 .json(&json!({
381 "token": plc_token,
382 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"],
383 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"]
384 }))
385 .send()
386 .await
387 .expect("Sign request failed");
388
389 let sign_status = sign_res.status();
390 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
391
392 assert_eq!(
393 sign_status,
394 StatusCode::OK,
395 "Sign with custom fields should succeed. Response: {:?}",
396 sign_body
397 );
398
399 let operation = sign_body.get("operation").expect("Should have operation");
400 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array());
401 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array());
402
403 assert!(also_known_as.is_some(), "Should have alsoKnownAs");
404 assert!(rotation_keys.is_some(), "Should have rotationKeys");
405 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases");
406 assert_eq!(rotation_keys.unwrap().len(), 2, "Should have 2 rotation keys");
407}
408
409#[tokio::test]
410async fn test_submit_plc_operation_success() {
411 let client = client();
412 let (token, did) = create_account_and_login(&client).await;
413
414 let key_bytes = get_user_signing_key(&did).await
415 .expect("Failed to get user signing key");
416 let signing_key = SigningKey::from_slice(&key_bytes)
417 .expect("Failed to create signing key");
418
419 let handle = get_user_handle(&did).await
420 .expect("Failed to get user handle");
421
422 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
423 let pds_endpoint = format!("https://{}", hostname);
424
425 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
426
427 unsafe {
428 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
429 }
430
431 let did_key = signing_key_to_did_key(&signing_key);
432
433 let operation = json!({
434 "type": "plc_operation",
435 "rotationKeys": [did_key.clone()],
436 "verificationMethods": {
437 "atproto": did_key.clone()
438 },
439 "alsoKnownAs": [format!("at://{}", handle)],
440 "services": {
441 "atproto_pds": {
442 "type": "AtprotoPersonalDataServer",
443 "endpoint": pds_endpoint
444 }
445 },
446 "prev": "bafyreiabc123",
447 "sig": "test_signature_base64"
448 });
449
450 let submit_res = client
451 .post(format!(
452 "{}/xrpc/com.atproto.identity.submitPlcOperation",
453 base_url().await
454 ))
455 .bearer_auth(&token)
456 .json(&json!({ "operation": operation }))
457 .send()
458 .await
459 .expect("Submit request failed");
460
461 let submit_status = submit_res.status();
462 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
463
464 assert_eq!(
465 submit_status,
466 StatusCode::OK,
467 "Submit PLC operation should succeed. Response: {:?}",
468 submit_body
469 );
470}
471
472#[tokio::test]
473async fn test_submit_plc_operation_wrong_endpoint_rejected() {
474 let client = client();
475 let (token, did) = create_account_and_login(&client).await;
476
477 let key_bytes = get_user_signing_key(&did).await
478 .expect("Failed to get user signing key");
479 let signing_key = SigningKey::from_slice(&key_bytes)
480 .expect("Failed to create signing key");
481
482 let handle = get_user_handle(&did).await
483 .expect("Failed to get user handle");
484
485 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
486 let pds_endpoint = format!("https://{}", hostname);
487
488 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
489
490 unsafe {
491 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
492 }
493
494 let did_key = signing_key_to_did_key(&signing_key);
495
496 let operation = json!({
497 "type": "plc_operation",
498 "rotationKeys": [did_key.clone()],
499 "verificationMethods": {
500 "atproto": did_key.clone()
501 },
502 "alsoKnownAs": [format!("at://{}", handle)],
503 "services": {
504 "atproto_pds": {
505 "type": "AtprotoPersonalDataServer",
506 "endpoint": "https://wrong-pds.example.com"
507 }
508 },
509 "prev": "bafyreiabc123",
510 "sig": "test_signature_base64"
511 });
512
513 let submit_res = client
514 .post(format!(
515 "{}/xrpc/com.atproto.identity.submitPlcOperation",
516 base_url().await
517 ))
518 .bearer_auth(&token)
519 .json(&json!({ "operation": operation }))
520 .send()
521 .await
522 .expect("Submit request failed");
523
524 assert_eq!(
525 submit_res.status(),
526 StatusCode::BAD_REQUEST,
527 "Submit with wrong endpoint should fail"
528 );
529
530 let body: Value = submit_res.json().await.unwrap();
531 assert_eq!(body["error"], "InvalidRequest");
532}
533
534#[tokio::test]
535async fn test_submit_plc_operation_wrong_signing_key_rejected() {
536 let client = client();
537 let (token, did) = create_account_and_login(&client).await;
538
539 let key_bytes = get_user_signing_key(&did).await
540 .expect("Failed to get user signing key");
541 let signing_key = SigningKey::from_slice(&key_bytes)
542 .expect("Failed to create signing key");
543
544 let handle = get_user_handle(&did).await
545 .expect("Failed to get user handle");
546
547 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
548 let pds_endpoint = format!("https://{}", hostname);
549
550 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
551
552 unsafe {
553 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
554 }
555
556 let wrong_key = SigningKey::random(&mut rand::thread_rng());
557 let wrong_did_key = signing_key_to_did_key(&wrong_key);
558 let correct_did_key = signing_key_to_did_key(&signing_key);
559
560 let operation = json!({
561 "type": "plc_operation",
562 "rotationKeys": [correct_did_key.clone()],
563 "verificationMethods": {
564 "atproto": wrong_did_key
565 },
566 "alsoKnownAs": [format!("at://{}", handle)],
567 "services": {
568 "atproto_pds": {
569 "type": "AtprotoPersonalDataServer",
570 "endpoint": pds_endpoint
571 }
572 },
573 "prev": "bafyreiabc123",
574 "sig": "test_signature_base64"
575 });
576
577 let submit_res = client
578 .post(format!(
579 "{}/xrpc/com.atproto.identity.submitPlcOperation",
580 base_url().await
581 ))
582 .bearer_auth(&token)
583 .json(&json!({ "operation": operation }))
584 .send()
585 .await
586 .expect("Submit request failed");
587
588 assert_eq!(
589 submit_res.status(),
590 StatusCode::BAD_REQUEST,
591 "Submit with wrong signing key should fail"
592 );
593
594 let body: Value = submit_res.json().await.unwrap();
595 assert_eq!(body["error"], "InvalidRequest");
596}
597
598#[tokio::test]
599async fn test_full_sign_and_submit_flow() {
600 let client = client();
601 let (token, did) = create_account_and_login(&client).await;
602
603 let key_bytes = get_user_signing_key(&did).await
604 .expect("Failed to get user signing key");
605 let signing_key = SigningKey::from_slice(&key_bytes)
606 .expect("Failed to create signing key");
607
608 let handle = get_user_handle(&did).await
609 .expect("Failed to get user handle");
610
611 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
612 let pds_endpoint = format!("https://{}", hostname);
613
614 let request_res = client
615 .post(format!(
616 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
617 base_url().await
618 ))
619 .bearer_auth(&token)
620 .send()
621 .await
622 .expect("Request failed");
623 assert_eq!(request_res.status(), StatusCode::OK);
624
625 let plc_token = get_plc_token_from_db(&did).await
626 .expect("PLC token not found");
627
628 let mock_server = MockServer::start().await;
629 let did_encoded = urlencoding::encode(&did);
630 let did_key = signing_key_to_did_key(&signing_key);
631
632 let last_op = json!({
633 "type": "plc_operation",
634 "rotationKeys": [did_key.clone()],
635 "verificationMethods": {
636 "atproto": did_key.clone()
637 },
638 "alsoKnownAs": [format!("at://{}", handle)],
639 "services": {
640 "atproto_pds": {
641 "type": "AtprotoPersonalDataServer",
642 "endpoint": pds_endpoint.clone()
643 }
644 },
645 "prev": null,
646 "sig": "initial_sig"
647 });
648
649 Mock::given(method("GET"))
650 .and(path(format!("/{}/log/last", did_encoded)))
651 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
652 .mount(&mock_server)
653 .await;
654
655 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
656 Mock::given(method("GET"))
657 .and(path(format!("/{}", did_encoded)))
658 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
659 .mount(&mock_server)
660 .await;
661
662 Mock::given(method("POST"))
663 .and(path(format!("/{}", did_encoded)))
664 .respond_with(ResponseTemplate::new(200))
665 .expect(1)
666 .mount(&mock_server)
667 .await;
668
669 unsafe {
670 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
671 }
672
673 let sign_res = client
674 .post(format!(
675 "{}/xrpc/com.atproto.identity.signPlcOperation",
676 base_url().await
677 ))
678 .bearer_auth(&token)
679 .json(&json!({ "token": plc_token }))
680 .send()
681 .await
682 .expect("Sign failed");
683
684 assert_eq!(sign_res.status(), StatusCode::OK);
685
686 let sign_body: Value = sign_res.json().await.unwrap();
687 let signed_operation = sign_body.get("operation")
688 .expect("Response should contain operation")
689 .clone();
690
691 assert!(signed_operation.get("sig").is_some());
692 assert!(signed_operation.get("prev").is_some());
693
694 let submit_res = client
695 .post(format!(
696 "{}/xrpc/com.atproto.identity.submitPlcOperation",
697 base_url().await
698 ))
699 .bearer_auth(&token)
700 .json(&json!({ "operation": signed_operation }))
701 .send()
702 .await
703 .expect("Submit failed");
704
705 let submit_status = submit_res.status();
706 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
707
708 assert_eq!(
709 submit_status,
710 StatusCode::OK,
711 "Full sign and submit flow should succeed. Response: {:?}",
712 submit_body
713 );
714}
715
716#[tokio::test]
717async fn test_cross_pds_migration_with_records() {
718 let client = client();
719 let (token, did) = create_account_and_login(&client).await;
720
721 let key_bytes = get_user_signing_key(&did).await
722 .expect("Failed to get user signing key");
723 let signing_key = SigningKey::from_slice(&key_bytes)
724 .expect("Failed to create signing key");
725
726 let handle = get_user_handle(&did).await
727 .expect("Failed to get user handle");
728
729 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
730 let pds_endpoint = format!("https://{}", hostname);
731
732 let post_payload = json!({
733 "repo": did,
734 "collection": "app.bsky.feed.post",
735 "record": {
736 "$type": "app.bsky.feed.post",
737 "text": "Test post before migration",
738 "createdAt": chrono::Utc::now().to_rfc3339(),
739 }
740 });
741
742 let create_res = client
743 .post(format!(
744 "{}/xrpc/com.atproto.repo.createRecord",
745 base_url().await
746 ))
747 .bearer_auth(&token)
748 .json(&post_payload)
749 .send()
750 .await
751 .expect("Failed to create post");
752 assert_eq!(create_res.status(), StatusCode::OK);
753
754 let create_body: Value = create_res.json().await.unwrap();
755 let original_uri = create_body["uri"].as_str().unwrap().to_string();
756
757 let export_res = client
758 .get(format!(
759 "{}/xrpc/com.atproto.sync.getRepo?did={}",
760 base_url().await,
761 did
762 ))
763 .send()
764 .await
765 .expect("Export failed");
766 assert_eq!(export_res.status(), StatusCode::OK);
767 let car_bytes = export_res.bytes().await.unwrap();
768
769 assert!(car_bytes.len() > 100, "CAR file should have meaningful content");
770
771 let mock_server = MockServer::start().await;
772 let did_encoded = urlencoding::encode(&did);
773 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
774
775 Mock::given(method("GET"))
776 .and(path(format!("/{}", did_encoded)))
777 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
778 .mount(&mock_server)
779 .await;
780
781 unsafe {
782 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
783 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
784 }
785
786 let import_res = client
787 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
788 .bearer_auth(&token)
789 .header("Content-Type", "application/vnd.ipld.car")
790 .body(car_bytes.to_vec())
791 .send()
792 .await
793 .expect("Import failed");
794
795 let import_status = import_res.status();
796 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
797
798 unsafe {
799 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
800 }
801
802 assert_eq!(
803 import_status,
804 StatusCode::OK,
805 "Import with valid DID document should succeed. Response: {:?}",
806 import_body
807 );
808
809 let get_record_res = client
810 .get(format!(
811 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}",
812 base_url().await,
813 did,
814 original_uri.split('/').last().unwrap()
815 ))
816 .send()
817 .await
818 .expect("Get record failed");
819
820 assert_eq!(
821 get_record_res.status(),
822 StatusCode::OK,
823 "Record should be retrievable after import"
824 );
825
826 let record_body: Value = get_record_res.json().await.unwrap();
827 assert_eq!(
828 record_body["value"]["text"],
829 "Test post before migration",
830 "Record content should match"
831 );
832}
833
834#[tokio::test]
835async fn test_migration_rejects_wrong_did_document() {
836 let client = client();
837 let (token, did) = create_account_and_login(&client).await;
838
839 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng());
840
841 let handle = get_user_handle(&did).await
842 .expect("Failed to get user handle");
843
844 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
845 let pds_endpoint = format!("https://{}", hostname);
846
847 let export_res = client
848 .get(format!(
849 "{}/xrpc/com.atproto.sync.getRepo?did={}",
850 base_url().await,
851 did
852 ))
853 .send()
854 .await
855 .expect("Export failed");
856 assert_eq!(export_res.status(), StatusCode::OK);
857 let car_bytes = export_res.bytes().await.unwrap();
858
859 let mock_server = MockServer::start().await;
860 let did_encoded = urlencoding::encode(&did);
861 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint);
862
863 Mock::given(method("GET"))
864 .and(path(format!("/{}", did_encoded)))
865 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc))
866 .mount(&mock_server)
867 .await;
868
869 unsafe {
870 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
871 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
872 }
873
874 let import_res = client
875 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
876 .bearer_auth(&token)
877 .header("Content-Type", "application/vnd.ipld.car")
878 .body(car_bytes.to_vec())
879 .send()
880 .await
881 .expect("Import failed");
882
883 let import_status = import_res.status();
884 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
885
886 unsafe {
887 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
888 }
889
890 assert_eq!(
891 import_status,
892 StatusCode::BAD_REQUEST,
893 "Import with wrong DID document should fail. Response: {:?}",
894 import_body
895 );
896
897 assert!(
898 import_body["error"] == "InvalidSignature" ||
899 import_body["message"].as_str().unwrap_or("").contains("signature"),
900 "Error should mention signature verification failure"
901 );
902}
903
904#[tokio::test]
905async fn test_full_migration_flow_end_to_end() {
906 let client = client();
907 let (token, did) = create_account_and_login(&client).await;
908
909 let key_bytes = get_user_signing_key(&did).await
910 .expect("Failed to get user signing key");
911 let signing_key = SigningKey::from_slice(&key_bytes)
912 .expect("Failed to create signing key");
913
914 let handle = get_user_handle(&did).await
915 .expect("Failed to get user handle");
916
917 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
918 let pds_endpoint = format!("https://{}", hostname);
919 let did_key = signing_key_to_did_key(&signing_key);
920
921 for i in 0..3 {
922 let post_payload = json!({
923 "repo": did,
924 "collection": "app.bsky.feed.post",
925 "record": {
926 "$type": "app.bsky.feed.post",
927 "text": format!("Pre-migration post #{}", i),
928 "createdAt": chrono::Utc::now().to_rfc3339(),
929 }
930 });
931
932 let res = client
933 .post(format!(
934 "{}/xrpc/com.atproto.repo.createRecord",
935 base_url().await
936 ))
937 .bearer_auth(&token)
938 .json(&post_payload)
939 .send()
940 .await
941 .expect("Failed to create post");
942 assert_eq!(res.status(), StatusCode::OK);
943 }
944
945 let request_res = client
946 .post(format!(
947 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
948 base_url().await
949 ))
950 .bearer_auth(&token)
951 .send()
952 .await
953 .expect("Request failed");
954 assert_eq!(request_res.status(), StatusCode::OK);
955
956 let plc_token = get_plc_token_from_db(&did).await
957 .expect("PLC token not found");
958
959 let mock_server = MockServer::start().await;
960 let did_encoded = urlencoding::encode(&did);
961
962 let last_op = json!({
963 "type": "plc_operation",
964 "rotationKeys": [did_key.clone()],
965 "verificationMethods": { "atproto": did_key.clone() },
966 "alsoKnownAs": [format!("at://{}", handle)],
967 "services": {
968 "atproto_pds": {
969 "type": "AtprotoPersonalDataServer",
970 "endpoint": pds_endpoint.clone()
971 }
972 },
973 "prev": null,
974 "sig": "initial_sig"
975 });
976
977 Mock::given(method("GET"))
978 .and(path(format!("/{}/log/last", did_encoded)))
979 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
980 .mount(&mock_server)
981 .await;
982
983 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
984 Mock::given(method("GET"))
985 .and(path(format!("/{}", did_encoded)))
986 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
987 .mount(&mock_server)
988 .await;
989
990 Mock::given(method("POST"))
991 .and(path(format!("/{}", did_encoded)))
992 .respond_with(ResponseTemplate::new(200))
993 .expect(1)
994 .mount(&mock_server)
995 .await;
996
997 unsafe {
998 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
999 }
1000
1001 let sign_res = client
1002 .post(format!(
1003 "{}/xrpc/com.atproto.identity.signPlcOperation",
1004 base_url().await
1005 ))
1006 .bearer_auth(&token)
1007 .json(&json!({ "token": plc_token }))
1008 .send()
1009 .await
1010 .expect("Sign failed");
1011 assert_eq!(sign_res.status(), StatusCode::OK);
1012
1013 let sign_body: Value = sign_res.json().await.unwrap();
1014 let signed_op = sign_body.get("operation").unwrap().clone();
1015
1016 let export_res = client
1017 .get(format!(
1018 "{}/xrpc/com.atproto.sync.getRepo?did={}",
1019 base_url().await,
1020 did
1021 ))
1022 .send()
1023 .await
1024 .expect("Export failed");
1025 assert_eq!(export_res.status(), StatusCode::OK);
1026 let car_bytes = export_res.bytes().await.unwrap();
1027
1028 let submit_res = client
1029 .post(format!(
1030 "{}/xrpc/com.atproto.identity.submitPlcOperation",
1031 base_url().await
1032 ))
1033 .bearer_auth(&token)
1034 .json(&json!({ "operation": signed_op }))
1035 .send()
1036 .await
1037 .expect("Submit failed");
1038 assert_eq!(submit_res.status(), StatusCode::OK);
1039
1040 unsafe {
1041 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
1042 }
1043
1044 let import_res = client
1045 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
1046 .bearer_auth(&token)
1047 .header("Content-Type", "application/vnd.ipld.car")
1048 .body(car_bytes.to_vec())
1049 .send()
1050 .await
1051 .expect("Import failed");
1052
1053 let import_status = import_res.status();
1054 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
1055
1056 unsafe {
1057 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
1058 }
1059
1060 assert_eq!(
1061 import_status,
1062 StatusCode::OK,
1063 "Full migration flow should succeed. Response: {:?}",
1064 import_body
1065 );
1066
1067 let list_res = client
1068 .get(format!(
1069 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post",
1070 base_url().await,
1071 did
1072 ))
1073 .send()
1074 .await
1075 .expect("List failed");
1076 assert_eq!(list_res.status(), StatusCode::OK);
1077
1078 let list_body: Value = list_res.json().await.unwrap();
1079 let records = list_body["records"].as_array()
1080 .expect("Should have records array");
1081
1082 assert!(
1083 records.len() >= 1,
1084 "Should have at least 1 record after migration, found {}",
1085 records.len()
1086 );
1087}