this repo has no description
1mod common; 2use common::*; 3 4use k256::ecdsa::SigningKey; 5use reqwest::StatusCode; 6use serde_json::{json, Value}; 7use sqlx::PgPool; 8use wiremock::matchers::{method, path}; 9use wiremock::{Mock, MockServer, ResponseTemplate}; 10 11fn encode_uvarint(mut x: u64) -> Vec<u8> { 12 let mut out = Vec::new(); 13 while x >= 0x80 { 14 out.push(((x as u8) & 0x7F) | 0x80); 15 x >>= 7; 16 } 17 out.push(x as u8); 18 out 19} 20 21fn signing_key_to_did_key(signing_key: &SigningKey) -> String { 22 let verifying_key = signing_key.verifying_key(); 23 let point = verifying_key.to_encoded_point(true); 24 let compressed_bytes = point.as_bytes(); 25 26 let mut prefixed = vec![0xe7, 0x01]; 27 prefixed.extend_from_slice(compressed_bytes); 28 29 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed); 30 format!("did:key:{}", encoded) 31} 32 33fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String { 34 let public_key = signing_key.verifying_key(); 35 let compressed = public_key.to_sec1_bytes(); 36 37 let mut buf = encode_uvarint(0xE7); 38 buf.extend_from_slice(&compressed); 39 multibase::encode(multibase::Base::Base58Btc, buf) 40} 41 42async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> { 43 let db_url = get_db_connection_string().await; 44 let pool = PgPool::connect(&db_url).await.ok()?; 45 46 let row = sqlx::query!( 47 r#" 48 SELECT k.key_bytes, k.encryption_version 49 FROM user_keys k 50 JOIN users u ON k.user_id = u.id 51 WHERE u.did = $1 52 "#, 53 did 54 ) 55 .fetch_optional(&pool) 56 .await 57 .ok()??; 58 59 bspds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok() 60} 61 62async fn get_plc_token_from_db(did: &str) -> Option<String> { 63 let db_url = get_db_connection_string().await; 64 let pool = PgPool::connect(&db_url).await.ok()?; 65 66 sqlx::query_scalar!( 67 r#" 68 SELECT t.token 69 FROM plc_operation_tokens t 70 JOIN users u ON t.user_id = u.id 71 WHERE u.did = $1 72 "#, 73 did 74 ) 75 .fetch_optional(&pool) 76 .await 77 .ok()? 78} 79 80async fn get_user_handle(did: &str) -> Option<String> { 81 let db_url = get_db_connection_string().await; 82 let pool = PgPool::connect(&db_url).await.ok()?; 83 84 sqlx::query_scalar!( 85 r#"SELECT handle FROM users WHERE did = $1"#, 86 did 87 ) 88 .fetch_optional(&pool) 89 .await 90 .ok()? 91} 92 93fn create_mock_last_op( 94 _did: &str, 95 handle: &str, 96 signing_key: &SigningKey, 97 pds_endpoint: &str, 98) -> Value { 99 let did_key = signing_key_to_did_key(signing_key); 100 101 json!({ 102 "type": "plc_operation", 103 "rotationKeys": [did_key.clone()], 104 "verificationMethods": { 105 "atproto": did_key 106 }, 107 "alsoKnownAs": [format!("at://{}", handle)], 108 "services": { 109 "atproto_pds": { 110 "type": "AtprotoPersonalDataServer", 111 "endpoint": pds_endpoint 112 } 113 }, 114 "prev": null, 115 "sig": "mock_signature_for_testing" 116 }) 117} 118 119fn create_did_document(did: &str, handle: &str, signing_key: &SigningKey, pds_endpoint: &str) -> Value { 120 let multikey = get_multikey_from_signing_key(signing_key); 121 122 json!({ 123 "@context": [ 124 "https://www.w3.org/ns/did/v1", 125 "https://w3id.org/security/multikey/v1" 126 ], 127 "id": did, 128 "alsoKnownAs": [format!("at://{}", handle)], 129 "verificationMethod": [{ 130 "id": format!("{}#atproto", did), 131 "type": "Multikey", 132 "controller": did, 133 "publicKeyMultibase": multikey 134 }], 135 "service": [{ 136 "id": "#atproto_pds", 137 "type": "AtprotoPersonalDataServer", 138 "serviceEndpoint": pds_endpoint 139 }] 140 }) 141} 142 143async fn setup_mock_plc_for_sign( 144 did: &str, 145 handle: &str, 146 signing_key: &SigningKey, 147 pds_endpoint: &str, 148) -> MockServer { 149 let mock_server = MockServer::start().await; 150 151 let did_encoded = urlencoding::encode(did); 152 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint); 153 154 Mock::given(method("GET")) 155 .and(path(format!("/{}/log/last", did_encoded))) 156 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 157 .mount(&mock_server) 158 .await; 159 160 mock_server 161} 162 163async fn setup_mock_plc_for_submit( 164 did: &str, 165 handle: &str, 166 signing_key: &SigningKey, 167 pds_endpoint: &str, 168) -> MockServer { 169 let mock_server = MockServer::start().await; 170 171 let did_encoded = urlencoding::encode(did); 172 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint); 173 174 Mock::given(method("GET")) 175 .and(path(format!("/{}", did_encoded))) 176 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone())) 177 .mount(&mock_server) 178 .await; 179 180 Mock::given(method("POST")) 181 .and(path(format!("/{}", did_encoded))) 182 .respond_with(ResponseTemplate::new(200)) 183 .mount(&mock_server) 184 .await; 185 186 mock_server 187} 188 189#[tokio::test] 190async fn test_full_plc_operation_flow() { 191 let client = client(); 192 let (token, did) = create_account_and_login(&client).await; 193 194 let key_bytes = get_user_signing_key(&did).await 195 .expect("Failed to get user signing key"); 196 let signing_key = SigningKey::from_slice(&key_bytes) 197 .expect("Failed to create signing key"); 198 199 let handle = get_user_handle(&did).await 200 .expect("Failed to get user handle"); 201 202 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 203 let pds_endpoint = format!("https://{}", hostname); 204 205 let request_res = client 206 .post(format!( 207 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 208 base_url().await 209 )) 210 .bearer_auth(&token) 211 .send() 212 .await 213 .expect("Request failed"); 214 215 assert_eq!(request_res.status(), StatusCode::OK); 216 217 let plc_token = get_plc_token_from_db(&did).await 218 .expect("PLC token not found in database"); 219 220 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 221 222 unsafe { 223 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 224 } 225 226 let sign_res = client 227 .post(format!( 228 "{}/xrpc/com.atproto.identity.signPlcOperation", 229 base_url().await 230 )) 231 .bearer_auth(&token) 232 .json(&json!({ 233 "token": plc_token 234 })) 235 .send() 236 .await 237 .expect("Sign request failed"); 238 239 let sign_status = sign_res.status(); 240 let sign_body: Value = sign_res.json().await.unwrap_or(json!({})); 241 242 assert_eq!( 243 sign_status, 244 StatusCode::OK, 245 "Sign PLC operation should succeed. Response: {:?}", 246 sign_body 247 ); 248 249 let operation = sign_body.get("operation") 250 .expect("Response should contain operation"); 251 252 assert!(operation.get("sig").is_some(), "Operation should be signed"); 253 assert_eq!(operation.get("type").and_then(|v| v.as_str()), Some("plc_operation")); 254 assert!(operation.get("prev").is_some(), "Operation should have prev reference"); 255} 256 257#[tokio::test] 258#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_consumes_token -- --ignored --test-threads=1"] 259async fn test_sign_plc_operation_consumes_token() { 260 let client = client(); 261 let (token, did) = create_account_and_login(&client).await; 262 263 let key_bytes = get_user_signing_key(&did).await 264 .expect("Failed to get user signing key"); 265 let signing_key = SigningKey::from_slice(&key_bytes) 266 .expect("Failed to create signing key"); 267 268 let handle = get_user_handle(&did).await 269 .expect("Failed to get user handle"); 270 271 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 272 let pds_endpoint = format!("https://{}", hostname); 273 274 let request_res = client 275 .post(format!( 276 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 277 base_url().await 278 )) 279 .bearer_auth(&token) 280 .send() 281 .await 282 .expect("Request failed"); 283 284 assert_eq!(request_res.status(), StatusCode::OK); 285 286 let plc_token = get_plc_token_from_db(&did).await 287 .expect("PLC token not found in database"); 288 289 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 290 291 unsafe { 292 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 293 } 294 295 let sign_res = client 296 .post(format!( 297 "{}/xrpc/com.atproto.identity.signPlcOperation", 298 base_url().await 299 )) 300 .bearer_auth(&token) 301 .json(&json!({ 302 "token": plc_token 303 })) 304 .send() 305 .await 306 .expect("Sign request failed"); 307 308 assert_eq!(sign_res.status(), StatusCode::OK); 309 310 let sign_res_2 = client 311 .post(format!( 312 "{}/xrpc/com.atproto.identity.signPlcOperation", 313 base_url().await 314 )) 315 .bearer_auth(&token) 316 .json(&json!({ 317 "token": plc_token 318 })) 319 .send() 320 .await 321 .expect("Second sign request failed"); 322 323 assert_eq!( 324 sign_res_2.status(), 325 StatusCode::BAD_REQUEST, 326 "Using the same token twice should fail" 327 ); 328 329 let body: Value = sign_res_2.json().await.unwrap(); 330 assert!( 331 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken", 332 "Error should indicate invalid/expired token" 333 ); 334} 335 336#[tokio::test] 337async fn test_sign_plc_operation_with_custom_fields() { 338 let client = client(); 339 let (token, did) = create_account_and_login(&client).await; 340 341 let key_bytes = get_user_signing_key(&did).await 342 .expect("Failed to get user signing key"); 343 let signing_key = SigningKey::from_slice(&key_bytes) 344 .expect("Failed to create signing key"); 345 346 let handle = get_user_handle(&did).await 347 .expect("Failed to get user handle"); 348 349 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 350 let pds_endpoint = format!("https://{}", hostname); 351 352 let request_res = client 353 .post(format!( 354 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 355 base_url().await 356 )) 357 .bearer_auth(&token) 358 .send() 359 .await 360 .expect("Request failed"); 361 362 assert_eq!(request_res.status(), StatusCode::OK); 363 364 let plc_token = get_plc_token_from_db(&did).await 365 .expect("PLC token not found in database"); 366 367 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 368 369 unsafe { 370 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 371 } 372 373 let did_key = signing_key_to_did_key(&signing_key); 374 375 let sign_res = client 376 .post(format!( 377 "{}/xrpc/com.atproto.identity.signPlcOperation", 378 base_url().await 379 )) 380 .bearer_auth(&token) 381 .json(&json!({ 382 "token": plc_token, 383 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"], 384 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"] 385 })) 386 .send() 387 .await 388 .expect("Sign request failed"); 389 390 let sign_status = sign_res.status(); 391 let sign_body: Value = sign_res.json().await.unwrap_or(json!({})); 392 393 assert_eq!( 394 sign_status, 395 StatusCode::OK, 396 "Sign with custom fields should succeed. Response: {:?}", 397 sign_body 398 ); 399 400 let operation = sign_body.get("operation").expect("Should have operation"); 401 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array()); 402 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array()); 403 404 assert!(also_known_as.is_some(), "Should have alsoKnownAs"); 405 assert!(rotation_keys.is_some(), "Should have rotationKeys"); 406 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases"); 407 assert_eq!(rotation_keys.unwrap().len(), 2, "Should have 2 rotation keys"); 408} 409 410#[tokio::test] 411async fn test_submit_plc_operation_success() { 412 let client = client(); 413 let (token, did) = create_account_and_login(&client).await; 414 415 let key_bytes = get_user_signing_key(&did).await 416 .expect("Failed to get user signing key"); 417 let signing_key = SigningKey::from_slice(&key_bytes) 418 .expect("Failed to create signing key"); 419 420 let handle = get_user_handle(&did).await 421 .expect("Failed to get user handle"); 422 423 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 424 let pds_endpoint = format!("https://{}", hostname); 425 426 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 427 428 unsafe { 429 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 430 } 431 432 let did_key = signing_key_to_did_key(&signing_key); 433 434 let operation = json!({ 435 "type": "plc_operation", 436 "rotationKeys": [did_key.clone()], 437 "verificationMethods": { 438 "atproto": did_key.clone() 439 }, 440 "alsoKnownAs": [format!("at://{}", handle)], 441 "services": { 442 "atproto_pds": { 443 "type": "AtprotoPersonalDataServer", 444 "endpoint": pds_endpoint 445 } 446 }, 447 "prev": "bafyreiabc123", 448 "sig": "test_signature_base64" 449 }); 450 451 let submit_res = client 452 .post(format!( 453 "{}/xrpc/com.atproto.identity.submitPlcOperation", 454 base_url().await 455 )) 456 .bearer_auth(&token) 457 .json(&json!({ "operation": operation })) 458 .send() 459 .await 460 .expect("Submit request failed"); 461 462 let submit_status = submit_res.status(); 463 let submit_body: Value = submit_res.json().await.unwrap_or(json!({})); 464 465 assert_eq!( 466 submit_status, 467 StatusCode::OK, 468 "Submit PLC operation should succeed. Response: {:?}", 469 submit_body 470 ); 471} 472 473#[tokio::test] 474async fn test_submit_plc_operation_wrong_endpoint_rejected() { 475 let client = client(); 476 let (token, did) = create_account_and_login(&client).await; 477 478 let key_bytes = get_user_signing_key(&did).await 479 .expect("Failed to get user signing key"); 480 let signing_key = SigningKey::from_slice(&key_bytes) 481 .expect("Failed to create signing key"); 482 483 let handle = get_user_handle(&did).await 484 .expect("Failed to get user handle"); 485 486 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 487 let pds_endpoint = format!("https://{}", hostname); 488 489 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 490 491 unsafe { 492 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 493 } 494 495 let did_key = signing_key_to_did_key(&signing_key); 496 497 let operation = json!({ 498 "type": "plc_operation", 499 "rotationKeys": [did_key.clone()], 500 "verificationMethods": { 501 "atproto": did_key.clone() 502 }, 503 "alsoKnownAs": [format!("at://{}", handle)], 504 "services": { 505 "atproto_pds": { 506 "type": "AtprotoPersonalDataServer", 507 "endpoint": "https://wrong-pds.example.com" 508 } 509 }, 510 "prev": "bafyreiabc123", 511 "sig": "test_signature_base64" 512 }); 513 514 let submit_res = client 515 .post(format!( 516 "{}/xrpc/com.atproto.identity.submitPlcOperation", 517 base_url().await 518 )) 519 .bearer_auth(&token) 520 .json(&json!({ "operation": operation })) 521 .send() 522 .await 523 .expect("Submit request failed"); 524 525 assert_eq!( 526 submit_res.status(), 527 StatusCode::BAD_REQUEST, 528 "Submit with wrong endpoint should fail" 529 ); 530 531 let body: Value = submit_res.json().await.unwrap(); 532 assert_eq!(body["error"], "InvalidRequest"); 533} 534 535#[tokio::test] 536async fn test_submit_plc_operation_wrong_signing_key_rejected() { 537 let client = client(); 538 let (token, did) = create_account_and_login(&client).await; 539 540 let key_bytes = get_user_signing_key(&did).await 541 .expect("Failed to get user signing key"); 542 let signing_key = SigningKey::from_slice(&key_bytes) 543 .expect("Failed to create signing key"); 544 545 let handle = get_user_handle(&did).await 546 .expect("Failed to get user handle"); 547 548 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 549 let pds_endpoint = format!("https://{}", hostname); 550 551 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 552 553 unsafe { 554 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 555 } 556 557 let wrong_key = SigningKey::random(&mut rand::thread_rng()); 558 let wrong_did_key = signing_key_to_did_key(&wrong_key); 559 let correct_did_key = signing_key_to_did_key(&signing_key); 560 561 let operation = json!({ 562 "type": "plc_operation", 563 "rotationKeys": [correct_did_key.clone()], 564 "verificationMethods": { 565 "atproto": wrong_did_key 566 }, 567 "alsoKnownAs": [format!("at://{}", handle)], 568 "services": { 569 "atproto_pds": { 570 "type": "AtprotoPersonalDataServer", 571 "endpoint": pds_endpoint 572 } 573 }, 574 "prev": "bafyreiabc123", 575 "sig": "test_signature_base64" 576 }); 577 578 let submit_res = client 579 .post(format!( 580 "{}/xrpc/com.atproto.identity.submitPlcOperation", 581 base_url().await 582 )) 583 .bearer_auth(&token) 584 .json(&json!({ "operation": operation })) 585 .send() 586 .await 587 .expect("Submit request failed"); 588 589 assert_eq!( 590 submit_res.status(), 591 StatusCode::BAD_REQUEST, 592 "Submit with wrong signing key should fail" 593 ); 594 595 let body: Value = submit_res.json().await.unwrap(); 596 assert_eq!(body["error"], "InvalidRequest"); 597} 598 599#[tokio::test] 600async fn test_full_sign_and_submit_flow() { 601 let client = client(); 602 let (token, did) = create_account_and_login(&client).await; 603 604 let key_bytes = get_user_signing_key(&did).await 605 .expect("Failed to get user signing key"); 606 let signing_key = SigningKey::from_slice(&key_bytes) 607 .expect("Failed to create signing key"); 608 609 let handle = get_user_handle(&did).await 610 .expect("Failed to get user handle"); 611 612 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 613 let pds_endpoint = format!("https://{}", hostname); 614 615 let request_res = client 616 .post(format!( 617 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 618 base_url().await 619 )) 620 .bearer_auth(&token) 621 .send() 622 .await 623 .expect("Request failed"); 624 assert_eq!(request_res.status(), StatusCode::OK); 625 626 let plc_token = get_plc_token_from_db(&did).await 627 .expect("PLC token not found"); 628 629 let mock_server = MockServer::start().await; 630 let did_encoded = urlencoding::encode(&did); 631 let did_key = signing_key_to_did_key(&signing_key); 632 633 let last_op = json!({ 634 "type": "plc_operation", 635 "rotationKeys": [did_key.clone()], 636 "verificationMethods": { 637 "atproto": did_key.clone() 638 }, 639 "alsoKnownAs": [format!("at://{}", handle)], 640 "services": { 641 "atproto_pds": { 642 "type": "AtprotoPersonalDataServer", 643 "endpoint": pds_endpoint.clone() 644 } 645 }, 646 "prev": null, 647 "sig": "initial_sig" 648 }); 649 650 Mock::given(method("GET")) 651 .and(path(format!("/{}/log/last", did_encoded))) 652 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 653 .mount(&mock_server) 654 .await; 655 656 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 657 Mock::given(method("GET")) 658 .and(path(format!("/{}", did_encoded))) 659 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 660 .mount(&mock_server) 661 .await; 662 663 Mock::given(method("POST")) 664 .and(path(format!("/{}", did_encoded))) 665 .respond_with(ResponseTemplate::new(200)) 666 .expect(1) 667 .mount(&mock_server) 668 .await; 669 670 unsafe { 671 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 672 } 673 674 let sign_res = client 675 .post(format!( 676 "{}/xrpc/com.atproto.identity.signPlcOperation", 677 base_url().await 678 )) 679 .bearer_auth(&token) 680 .json(&json!({ "token": plc_token })) 681 .send() 682 .await 683 .expect("Sign failed"); 684 685 assert_eq!(sign_res.status(), StatusCode::OK); 686 687 let sign_body: Value = sign_res.json().await.unwrap(); 688 let signed_operation = sign_body.get("operation") 689 .expect("Response should contain operation") 690 .clone(); 691 692 assert!(signed_operation.get("sig").is_some()); 693 assert!(signed_operation.get("prev").is_some()); 694 695 let submit_res = client 696 .post(format!( 697 "{}/xrpc/com.atproto.identity.submitPlcOperation", 698 base_url().await 699 )) 700 .bearer_auth(&token) 701 .json(&json!({ "operation": signed_operation })) 702 .send() 703 .await 704 .expect("Submit failed"); 705 706 let submit_status = submit_res.status(); 707 let submit_body: Value = submit_res.json().await.unwrap_or(json!({})); 708 709 assert_eq!( 710 submit_status, 711 StatusCode::OK, 712 "Full sign and submit flow should succeed. Response: {:?}", 713 submit_body 714 ); 715} 716 717#[tokio::test] 718async fn test_cross_pds_migration_with_records() { 719 let client = client(); 720 let (token, did) = create_account_and_login(&client).await; 721 722 let key_bytes = get_user_signing_key(&did).await 723 .expect("Failed to get user signing key"); 724 let signing_key = SigningKey::from_slice(&key_bytes) 725 .expect("Failed to create signing key"); 726 727 let handle = get_user_handle(&did).await 728 .expect("Failed to get user handle"); 729 730 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 731 let pds_endpoint = format!("https://{}", hostname); 732 733 let post_payload = json!({ 734 "repo": did, 735 "collection": "app.bsky.feed.post", 736 "record": { 737 "$type": "app.bsky.feed.post", 738 "text": "Test post before migration", 739 "createdAt": chrono::Utc::now().to_rfc3339(), 740 } 741 }); 742 743 let create_res = client 744 .post(format!( 745 "{}/xrpc/com.atproto.repo.createRecord", 746 base_url().await 747 )) 748 .bearer_auth(&token) 749 .json(&post_payload) 750 .send() 751 .await 752 .expect("Failed to create post"); 753 assert_eq!(create_res.status(), StatusCode::OK); 754 755 let create_body: Value = create_res.json().await.unwrap(); 756 let original_uri = create_body["uri"].as_str().unwrap().to_string(); 757 758 let export_res = client 759 .get(format!( 760 "{}/xrpc/com.atproto.sync.getRepo?did={}", 761 base_url().await, 762 did 763 )) 764 .send() 765 .await 766 .expect("Export failed"); 767 assert_eq!(export_res.status(), StatusCode::OK); 768 let car_bytes = export_res.bytes().await.unwrap(); 769 770 assert!(car_bytes.len() > 100, "CAR file should have meaningful content"); 771 772 let mock_server = MockServer::start().await; 773 let did_encoded = urlencoding::encode(&did); 774 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 775 776 Mock::given(method("GET")) 777 .and(path(format!("/{}", did_encoded))) 778 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 779 .mount(&mock_server) 780 .await; 781 782 unsafe { 783 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 784 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 785 } 786 787 let import_res = client 788 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 789 .bearer_auth(&token) 790 .header("Content-Type", "application/vnd.ipld.car") 791 .body(car_bytes.to_vec()) 792 .send() 793 .await 794 .expect("Import failed"); 795 796 let import_status = import_res.status(); 797 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 798 799 unsafe { 800 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 801 } 802 803 assert_eq!( 804 import_status, 805 StatusCode::OK, 806 "Import with valid DID document should succeed. Response: {:?}", 807 import_body 808 ); 809 810 let get_record_res = client 811 .get(format!( 812 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}", 813 base_url().await, 814 did, 815 original_uri.split('/').last().unwrap() 816 )) 817 .send() 818 .await 819 .expect("Get record failed"); 820 821 assert_eq!( 822 get_record_res.status(), 823 StatusCode::OK, 824 "Record should be retrievable after import" 825 ); 826 827 let record_body: Value = get_record_res.json().await.unwrap(); 828 assert_eq!( 829 record_body["value"]["text"], 830 "Test post before migration", 831 "Record content should match" 832 ); 833} 834 835#[tokio::test] 836async fn test_migration_rejects_wrong_did_document() { 837 let client = client(); 838 let (token, did) = create_account_and_login(&client).await; 839 840 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng()); 841 842 let handle = get_user_handle(&did).await 843 .expect("Failed to get user handle"); 844 845 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 846 let pds_endpoint = format!("https://{}", hostname); 847 848 let export_res = client 849 .get(format!( 850 "{}/xrpc/com.atproto.sync.getRepo?did={}", 851 base_url().await, 852 did 853 )) 854 .send() 855 .await 856 .expect("Export failed"); 857 assert_eq!(export_res.status(), StatusCode::OK); 858 let car_bytes = export_res.bytes().await.unwrap(); 859 860 let mock_server = MockServer::start().await; 861 let did_encoded = urlencoding::encode(&did); 862 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint); 863 864 Mock::given(method("GET")) 865 .and(path(format!("/{}", did_encoded))) 866 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc)) 867 .mount(&mock_server) 868 .await; 869 870 unsafe { 871 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 872 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 873 } 874 875 let import_res = client 876 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 877 .bearer_auth(&token) 878 .header("Content-Type", "application/vnd.ipld.car") 879 .body(car_bytes.to_vec()) 880 .send() 881 .await 882 .expect("Import failed"); 883 884 let import_status = import_res.status(); 885 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 886 887 unsafe { 888 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 889 } 890 891 assert_eq!( 892 import_status, 893 StatusCode::BAD_REQUEST, 894 "Import with wrong DID document should fail. Response: {:?}", 895 import_body 896 ); 897 898 assert!( 899 import_body["error"] == "InvalidSignature" || 900 import_body["message"].as_str().unwrap_or("").contains("signature"), 901 "Error should mention signature verification failure" 902 ); 903} 904 905#[tokio::test] 906#[ignore = "requires exclusive env var access; run with: cargo test test_full_migration_flow_end_to_end -- --ignored --test-threads=1"] 907async fn test_full_migration_flow_end_to_end() { 908 let client = client(); 909 let (token, did) = create_account_and_login(&client).await; 910 911 let key_bytes = get_user_signing_key(&did).await 912 .expect("Failed to get user signing key"); 913 let signing_key = SigningKey::from_slice(&key_bytes) 914 .expect("Failed to create signing key"); 915 916 let handle = get_user_handle(&did).await 917 .expect("Failed to get user handle"); 918 919 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 920 let pds_endpoint = format!("https://{}", hostname); 921 let did_key = signing_key_to_did_key(&signing_key); 922 923 for i in 0..3 { 924 let post_payload = json!({ 925 "repo": did, 926 "collection": "app.bsky.feed.post", 927 "record": { 928 "$type": "app.bsky.feed.post", 929 "text": format!("Pre-migration post #{}", i), 930 "createdAt": chrono::Utc::now().to_rfc3339(), 931 } 932 }); 933 934 let res = client 935 .post(format!( 936 "{}/xrpc/com.atproto.repo.createRecord", 937 base_url().await 938 )) 939 .bearer_auth(&token) 940 .json(&post_payload) 941 .send() 942 .await 943 .expect("Failed to create post"); 944 assert_eq!(res.status(), StatusCode::OK); 945 } 946 947 let request_res = client 948 .post(format!( 949 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 950 base_url().await 951 )) 952 .bearer_auth(&token) 953 .send() 954 .await 955 .expect("Request failed"); 956 assert_eq!(request_res.status(), StatusCode::OK); 957 958 let plc_token = get_plc_token_from_db(&did).await 959 .expect("PLC token not found"); 960 961 let mock_server = MockServer::start().await; 962 let did_encoded = urlencoding::encode(&did); 963 964 let last_op = json!({ 965 "type": "plc_operation", 966 "rotationKeys": [did_key.clone()], 967 "verificationMethods": { "atproto": did_key.clone() }, 968 "alsoKnownAs": [format!("at://{}", handle)], 969 "services": { 970 "atproto_pds": { 971 "type": "AtprotoPersonalDataServer", 972 "endpoint": pds_endpoint.clone() 973 } 974 }, 975 "prev": null, 976 "sig": "initial_sig" 977 }); 978 979 Mock::given(method("GET")) 980 .and(path(format!("/{}/log/last", did_encoded))) 981 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 982 .mount(&mock_server) 983 .await; 984 985 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 986 Mock::given(method("GET")) 987 .and(path(format!("/{}", did_encoded))) 988 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 989 .mount(&mock_server) 990 .await; 991 992 Mock::given(method("POST")) 993 .and(path(format!("/{}", did_encoded))) 994 .respond_with(ResponseTemplate::new(200)) 995 .expect(1) 996 .mount(&mock_server) 997 .await; 998 999 unsafe { 1000 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 1001 } 1002 1003 let sign_res = client 1004 .post(format!( 1005 "{}/xrpc/com.atproto.identity.signPlcOperation", 1006 base_url().await 1007 )) 1008 .bearer_auth(&token) 1009 .json(&json!({ "token": plc_token })) 1010 .send() 1011 .await 1012 .expect("Sign failed"); 1013 assert_eq!(sign_res.status(), StatusCode::OK); 1014 1015 let sign_body: Value = sign_res.json().await.unwrap(); 1016 let signed_op = sign_body.get("operation").unwrap().clone(); 1017 1018 let export_res = client 1019 .get(format!( 1020 "{}/xrpc/com.atproto.sync.getRepo?did={}", 1021 base_url().await, 1022 did 1023 )) 1024 .send() 1025 .await 1026 .expect("Export failed"); 1027 assert_eq!(export_res.status(), StatusCode::OK); 1028 let car_bytes = export_res.bytes().await.unwrap(); 1029 1030 let submit_res = client 1031 .post(format!( 1032 "{}/xrpc/com.atproto.identity.submitPlcOperation", 1033 base_url().await 1034 )) 1035 .bearer_auth(&token) 1036 .json(&json!({ "operation": signed_op })) 1037 .send() 1038 .await 1039 .expect("Submit failed"); 1040 assert_eq!(submit_res.status(), StatusCode::OK); 1041 1042 unsafe { 1043 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 1044 } 1045 1046 let import_res = client 1047 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 1048 .bearer_auth(&token) 1049 .header("Content-Type", "application/vnd.ipld.car") 1050 .body(car_bytes.to_vec()) 1051 .send() 1052 .await 1053 .expect("Import failed"); 1054 1055 let import_status = import_res.status(); 1056 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 1057 1058 unsafe { 1059 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 1060 } 1061 1062 assert_eq!( 1063 import_status, 1064 StatusCode::OK, 1065 "Full migration flow should succeed. Response: {:?}", 1066 import_body 1067 ); 1068 1069 let list_res = client 1070 .get(format!( 1071 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post", 1072 base_url().await, 1073 did 1074 )) 1075 .send() 1076 .await 1077 .expect("List failed"); 1078 assert_eq!(list_res.status(), StatusCode::OK); 1079 1080 let list_body: Value = list_res.json().await.unwrap(); 1081 let records = list_body["records"].as_array() 1082 .expect("Should have records array"); 1083 1084 assert!( 1085 records.len() >= 1, 1086 "Should have at least 1 record after migration, found {}", 1087 records.len() 1088 ); 1089}