this repo has no description
1mod common; 2use common::*; 3use k256::ecdsa::SigningKey; 4use reqwest::StatusCode; 5use serde_json::{json, Value}; 6use sqlx::PgPool; 7use wiremock::matchers::{method, path}; 8use wiremock::{Mock, MockServer, ResponseTemplate}; 9 10fn encode_uvarint(mut x: u64) -> Vec<u8> { 11 let mut out = Vec::new(); 12 while x >= 0x80 { 13 out.push(((x as u8) & 0x7F) | 0x80); 14 x >>= 7; 15 } 16 out.push(x as u8); 17 out 18} 19 20fn signing_key_to_did_key(signing_key: &SigningKey) -> String { 21 let verifying_key = signing_key.verifying_key(); 22 let point = verifying_key.to_encoded_point(true); 23 let compressed_bytes = point.as_bytes(); 24 let mut prefixed = vec![0xe7, 0x01]; 25 prefixed.extend_from_slice(compressed_bytes); 26 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed); 27 format!("did:key:{}", encoded) 28} 29 30fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String { 31 let public_key = signing_key.verifying_key(); 32 let compressed = public_key.to_sec1_bytes(); 33 let mut buf = encode_uvarint(0xE7); 34 buf.extend_from_slice(&compressed); 35 multibase::encode(multibase::Base::Base58Btc, buf) 36} 37 38async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> { 39 let db_url = get_db_connection_string().await; 40 let pool = PgPool::connect(&db_url).await.ok()?; 41 let row = sqlx::query!( 42 r#" 43 SELECT k.key_bytes, k.encryption_version 44 FROM user_keys k 45 JOIN users u ON k.user_id = u.id 46 WHERE u.did = $1 47 "#, 48 did 49 ) 50 .fetch_optional(&pool) 51 .await 52 .ok()??; 53 bspds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok() 54} 55 56async fn get_plc_token_from_db(did: &str) -> Option<String> { 57 let db_url = get_db_connection_string().await; 58 let pool = PgPool::connect(&db_url).await.ok()?; 59 sqlx::query_scalar!( 60 r#" 61 SELECT t.token 62 FROM plc_operation_tokens t 63 JOIN users u ON t.user_id = u.id 64 WHERE u.did = $1 65 "#, 66 did 67 ) 68 .fetch_optional(&pool) 69 .await 70 .ok()? 71} 72 73async fn get_user_handle(did: &str) -> Option<String> { 74 let db_url = get_db_connection_string().await; 75 let pool = PgPool::connect(&db_url).await.ok()?; 76 sqlx::query_scalar!( 77 r#"SELECT handle FROM users WHERE did = $1"#, 78 did 79 ) 80 .fetch_optional(&pool) 81 .await 82 .ok()? 83} 84 85fn create_mock_last_op( 86 _did: &str, 87 handle: &str, 88 signing_key: &SigningKey, 89 pds_endpoint: &str, 90) -> Value { 91 let did_key = signing_key_to_did_key(signing_key); 92 json!({ 93 "type": "plc_operation", 94 "rotationKeys": [did_key.clone()], 95 "verificationMethods": { 96 "atproto": did_key 97 }, 98 "alsoKnownAs": [format!("at://{}", handle)], 99 "services": { 100 "atproto_pds": { 101 "type": "AtprotoPersonalDataServer", 102 "endpoint": pds_endpoint 103 } 104 }, 105 "prev": null, 106 "sig": "mock_signature_for_testing" 107 }) 108} 109 110fn create_did_document(did: &str, handle: &str, signing_key: &SigningKey, pds_endpoint: &str) -> Value { 111 let multikey = get_multikey_from_signing_key(signing_key); 112 json!({ 113 "@context": [ 114 "https://www.w3.org/ns/did/v1", 115 "https://w3id.org/security/multikey/v1" 116 ], 117 "id": did, 118 "alsoKnownAs": [format!("at://{}", handle)], 119 "verificationMethod": [{ 120 "id": format!("{}#atproto", did), 121 "type": "Multikey", 122 "controller": did, 123 "publicKeyMultibase": multikey 124 }], 125 "service": [{ 126 "id": "#atproto_pds", 127 "type": "AtprotoPersonalDataServer", 128 "serviceEndpoint": pds_endpoint 129 }] 130 }) 131} 132 133async fn setup_mock_plc_for_sign( 134 did: &str, 135 handle: &str, 136 signing_key: &SigningKey, 137 pds_endpoint: &str, 138) -> MockServer { 139 let mock_server = MockServer::start().await; 140 let did_encoded = urlencoding::encode(did); 141 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint); 142 Mock::given(method("GET")) 143 .and(path(format!("/{}/log/last", did_encoded))) 144 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 145 .mount(&mock_server) 146 .await; 147 mock_server 148} 149 150async fn setup_mock_plc_for_submit( 151 did: &str, 152 handle: &str, 153 signing_key: &SigningKey, 154 pds_endpoint: &str, 155) -> MockServer { 156 let mock_server = MockServer::start().await; 157 let did_encoded = urlencoding::encode(did); 158 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint); 159 Mock::given(method("GET")) 160 .and(path(format!("/{}", did_encoded))) 161 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone())) 162 .mount(&mock_server) 163 .await; 164 Mock::given(method("POST")) 165 .and(path(format!("/{}", did_encoded))) 166 .respond_with(ResponseTemplate::new(200)) 167 .mount(&mock_server) 168 .await; 169 mock_server 170} 171 172#[tokio::test] 173#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"] 174async fn test_full_plc_operation_flow() { 175 let client = client(); 176 let (token, did) = create_account_and_login(&client).await; 177 let key_bytes = get_user_signing_key(&did).await 178 .expect("Failed to get user signing key"); 179 let signing_key = SigningKey::from_slice(&key_bytes) 180 .expect("Failed to create signing key"); 181 let handle = get_user_handle(&did).await 182 .expect("Failed to get user handle"); 183 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 184 let pds_endpoint = format!("https://{}", hostname); 185 let request_res = client 186 .post(format!( 187 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 188 base_url().await 189 )) 190 .bearer_auth(&token) 191 .send() 192 .await 193 .expect("Request failed"); 194 assert_eq!(request_res.status(), StatusCode::OK); 195 let plc_token = get_plc_token_from_db(&did).await 196 .expect("PLC token not found in database"); 197 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 198 unsafe { 199 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 200 } 201 let sign_res = client 202 .post(format!( 203 "{}/xrpc/com.atproto.identity.signPlcOperation", 204 base_url().await 205 )) 206 .bearer_auth(&token) 207 .json(&json!({ 208 "token": plc_token 209 })) 210 .send() 211 .await 212 .expect("Sign request failed"); 213 let sign_status = sign_res.status(); 214 let sign_body: Value = sign_res.json().await.unwrap_or(json!({})); 215 assert_eq!( 216 sign_status, 217 StatusCode::OK, 218 "Sign PLC operation should succeed. Response: {:?}", 219 sign_body 220 ); 221 let operation = sign_body.get("operation") 222 .expect("Response should contain operation"); 223 assert!(operation.get("sig").is_some(), "Operation should be signed"); 224 assert_eq!(operation.get("type").and_then(|v| v.as_str()), Some("plc_operation")); 225 assert!(operation.get("prev").is_some(), "Operation should have prev reference"); 226} 227 228#[tokio::test] 229#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_consumes_token -- --ignored --test-threads=1"] 230async fn test_sign_plc_operation_consumes_token() { 231 let client = client(); 232 let (token, did) = create_account_and_login(&client).await; 233 let key_bytes = get_user_signing_key(&did).await 234 .expect("Failed to get user signing key"); 235 let signing_key = SigningKey::from_slice(&key_bytes) 236 .expect("Failed to create signing key"); 237 let handle = get_user_handle(&did).await 238 .expect("Failed to get user handle"); 239 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 240 let pds_endpoint = format!("https://{}", hostname); 241 let request_res = client 242 .post(format!( 243 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 244 base_url().await 245 )) 246 .bearer_auth(&token) 247 .send() 248 .await 249 .expect("Request failed"); 250 assert_eq!(request_res.status(), StatusCode::OK); 251 let plc_token = get_plc_token_from_db(&did).await 252 .expect("PLC token not found in database"); 253 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 254 unsafe { 255 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 256 } 257 let sign_res = client 258 .post(format!( 259 "{}/xrpc/com.atproto.identity.signPlcOperation", 260 base_url().await 261 )) 262 .bearer_auth(&token) 263 .json(&json!({ 264 "token": plc_token 265 })) 266 .send() 267 .await 268 .expect("Sign request failed"); 269 assert_eq!(sign_res.status(), StatusCode::OK); 270 let sign_res_2 = client 271 .post(format!( 272 "{}/xrpc/com.atproto.identity.signPlcOperation", 273 base_url().await 274 )) 275 .bearer_auth(&token) 276 .json(&json!({ 277 "token": plc_token 278 })) 279 .send() 280 .await 281 .expect("Second sign request failed"); 282 assert_eq!( 283 sign_res_2.status(), 284 StatusCode::BAD_REQUEST, 285 "Using the same token twice should fail" 286 ); 287 let body: Value = sign_res_2.json().await.unwrap(); 288 assert!( 289 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken", 290 "Error should indicate invalid/expired token" 291 ); 292} 293 294#[tokio::test] 295async fn test_sign_plc_operation_with_custom_fields() { 296 let client = client(); 297 let (token, did) = create_account_and_login(&client).await; 298 let key_bytes = get_user_signing_key(&did).await 299 .expect("Failed to get user signing key"); 300 let signing_key = SigningKey::from_slice(&key_bytes) 301 .expect("Failed to create signing key"); 302 let handle = get_user_handle(&did).await 303 .expect("Failed to get user handle"); 304 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 305 let pds_endpoint = format!("https://{}", hostname); 306 let request_res = client 307 .post(format!( 308 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 309 base_url().await 310 )) 311 .bearer_auth(&token) 312 .send() 313 .await 314 .expect("Request failed"); 315 assert_eq!(request_res.status(), StatusCode::OK); 316 let plc_token = get_plc_token_from_db(&did).await 317 .expect("PLC token not found in database"); 318 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 319 unsafe { 320 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 321 } 322 let did_key = signing_key_to_did_key(&signing_key); 323 let sign_res = client 324 .post(format!( 325 "{}/xrpc/com.atproto.identity.signPlcOperation", 326 base_url().await 327 )) 328 .bearer_auth(&token) 329 .json(&json!({ 330 "token": plc_token, 331 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"], 332 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"] 333 })) 334 .send() 335 .await 336 .expect("Sign request failed"); 337 let sign_status = sign_res.status(); 338 let sign_body: Value = sign_res.json().await.unwrap_or(json!({})); 339 assert_eq!( 340 sign_status, 341 StatusCode::OK, 342 "Sign with custom fields should succeed. Response: {:?}", 343 sign_body 344 ); 345 let operation = sign_body.get("operation").expect("Should have operation"); 346 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array()); 347 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array()); 348 assert!(also_known_as.is_some(), "Should have alsoKnownAs"); 349 assert!(rotation_keys.is_some(), "Should have rotationKeys"); 350 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases"); 351 assert_eq!(rotation_keys.unwrap().len(), 2, "Should have 2 rotation keys"); 352} 353 354#[tokio::test] 355#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"] 356async fn test_submit_plc_operation_success() { 357 let client = client(); 358 let (token, did) = create_account_and_login(&client).await; 359 let key_bytes = get_user_signing_key(&did).await 360 .expect("Failed to get user signing key"); 361 let signing_key = SigningKey::from_slice(&key_bytes) 362 .expect("Failed to create signing key"); 363 let handle = get_user_handle(&did).await 364 .expect("Failed to get user handle"); 365 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 366 let pds_endpoint = format!("https://{}", hostname); 367 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 368 unsafe { 369 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 370 } 371 let did_key = signing_key_to_did_key(&signing_key); 372 let operation = json!({ 373 "type": "plc_operation", 374 "rotationKeys": [did_key.clone()], 375 "verificationMethods": { 376 "atproto": did_key.clone() 377 }, 378 "alsoKnownAs": [format!("at://{}", handle)], 379 "services": { 380 "atproto_pds": { 381 "type": "AtprotoPersonalDataServer", 382 "endpoint": pds_endpoint 383 } 384 }, 385 "prev": "bafyreiabc123", 386 "sig": "test_signature_base64" 387 }); 388 let submit_res = client 389 .post(format!( 390 "{}/xrpc/com.atproto.identity.submitPlcOperation", 391 base_url().await 392 )) 393 .bearer_auth(&token) 394 .json(&json!({ "operation": operation })) 395 .send() 396 .await 397 .expect("Submit request failed"); 398 let submit_status = submit_res.status(); 399 let submit_body: Value = submit_res.json().await.unwrap_or(json!({})); 400 assert_eq!( 401 submit_status, 402 StatusCode::OK, 403 "Submit PLC operation should succeed. Response: {:?}", 404 submit_body 405 ); 406} 407 408#[tokio::test] 409async fn test_submit_plc_operation_wrong_endpoint_rejected() { 410 let client = client(); 411 let (token, did) = create_account_and_login(&client).await; 412 let key_bytes = get_user_signing_key(&did).await 413 .expect("Failed to get user signing key"); 414 let signing_key = SigningKey::from_slice(&key_bytes) 415 .expect("Failed to create signing key"); 416 let handle = get_user_handle(&did).await 417 .expect("Failed to get user handle"); 418 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 419 let pds_endpoint = format!("https://{}", hostname); 420 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 421 unsafe { 422 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 423 } 424 let did_key = signing_key_to_did_key(&signing_key); 425 let operation = json!({ 426 "type": "plc_operation", 427 "rotationKeys": [did_key.clone()], 428 "verificationMethods": { 429 "atproto": did_key.clone() 430 }, 431 "alsoKnownAs": [format!("at://{}", handle)], 432 "services": { 433 "atproto_pds": { 434 "type": "AtprotoPersonalDataServer", 435 "endpoint": "https://wrong-pds.example.com" 436 } 437 }, 438 "prev": "bafyreiabc123", 439 "sig": "test_signature_base64" 440 }); 441 let submit_res = client 442 .post(format!( 443 "{}/xrpc/com.atproto.identity.submitPlcOperation", 444 base_url().await 445 )) 446 .bearer_auth(&token) 447 .json(&json!({ "operation": operation })) 448 .send() 449 .await 450 .expect("Submit request failed"); 451 assert_eq!( 452 submit_res.status(), 453 StatusCode::BAD_REQUEST, 454 "Submit with wrong endpoint should fail" 455 ); 456 let body: Value = submit_res.json().await.unwrap(); 457 assert_eq!(body["error"], "InvalidRequest"); 458} 459 460#[tokio::test] 461async fn test_submit_plc_operation_wrong_signing_key_rejected() { 462 let client = client(); 463 let (token, did) = create_account_and_login(&client).await; 464 let key_bytes = get_user_signing_key(&did).await 465 .expect("Failed to get user signing key"); 466 let signing_key = SigningKey::from_slice(&key_bytes) 467 .expect("Failed to create signing key"); 468 let handle = get_user_handle(&did).await 469 .expect("Failed to get user handle"); 470 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 471 let pds_endpoint = format!("https://{}", hostname); 472 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 473 unsafe { 474 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 475 } 476 let wrong_key = SigningKey::random(&mut rand::thread_rng()); 477 let wrong_did_key = signing_key_to_did_key(&wrong_key); 478 let correct_did_key = signing_key_to_did_key(&signing_key); 479 let operation = json!({ 480 "type": "plc_operation", 481 "rotationKeys": [correct_did_key.clone()], 482 "verificationMethods": { 483 "atproto": wrong_did_key 484 }, 485 "alsoKnownAs": [format!("at://{}", handle)], 486 "services": { 487 "atproto_pds": { 488 "type": "AtprotoPersonalDataServer", 489 "endpoint": pds_endpoint 490 } 491 }, 492 "prev": "bafyreiabc123", 493 "sig": "test_signature_base64" 494 }); 495 let submit_res = client 496 .post(format!( 497 "{}/xrpc/com.atproto.identity.submitPlcOperation", 498 base_url().await 499 )) 500 .bearer_auth(&token) 501 .json(&json!({ "operation": operation })) 502 .send() 503 .await 504 .expect("Submit request failed"); 505 assert_eq!( 506 submit_res.status(), 507 StatusCode::BAD_REQUEST, 508 "Submit with wrong signing key should fail" 509 ); 510 let body: Value = submit_res.json().await.unwrap(); 511 assert_eq!(body["error"], "InvalidRequest"); 512} 513 514#[tokio::test] 515async fn test_full_sign_and_submit_flow() { 516 let client = client(); 517 let (token, did) = create_account_and_login(&client).await; 518 let key_bytes = get_user_signing_key(&did).await 519 .expect("Failed to get user signing key"); 520 let signing_key = SigningKey::from_slice(&key_bytes) 521 .expect("Failed to create signing key"); 522 let handle = get_user_handle(&did).await 523 .expect("Failed to get user handle"); 524 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 525 let pds_endpoint = format!("https://{}", hostname); 526 let request_res = client 527 .post(format!( 528 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 529 base_url().await 530 )) 531 .bearer_auth(&token) 532 .send() 533 .await 534 .expect("Request failed"); 535 assert_eq!(request_res.status(), StatusCode::OK); 536 let plc_token = get_plc_token_from_db(&did).await 537 .expect("PLC token not found"); 538 let mock_server = MockServer::start().await; 539 let did_encoded = urlencoding::encode(&did); 540 let did_key = signing_key_to_did_key(&signing_key); 541 let last_op = json!({ 542 "type": "plc_operation", 543 "rotationKeys": [did_key.clone()], 544 "verificationMethods": { 545 "atproto": did_key.clone() 546 }, 547 "alsoKnownAs": [format!("at://{}", handle)], 548 "services": { 549 "atproto_pds": { 550 "type": "AtprotoPersonalDataServer", 551 "endpoint": pds_endpoint.clone() 552 } 553 }, 554 "prev": null, 555 "sig": "initial_sig" 556 }); 557 Mock::given(method("GET")) 558 .and(path(format!("/{}/log/last", did_encoded))) 559 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 560 .mount(&mock_server) 561 .await; 562 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 563 Mock::given(method("GET")) 564 .and(path(format!("/{}", did_encoded))) 565 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 566 .mount(&mock_server) 567 .await; 568 Mock::given(method("POST")) 569 .and(path(format!("/{}", did_encoded))) 570 .respond_with(ResponseTemplate::new(200)) 571 .expect(1) 572 .mount(&mock_server) 573 .await; 574 unsafe { 575 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 576 } 577 let sign_res = client 578 .post(format!( 579 "{}/xrpc/com.atproto.identity.signPlcOperation", 580 base_url().await 581 )) 582 .bearer_auth(&token) 583 .json(&json!({ "token": plc_token })) 584 .send() 585 .await 586 .expect("Sign failed"); 587 assert_eq!(sign_res.status(), StatusCode::OK); 588 let sign_body: Value = sign_res.json().await.unwrap(); 589 let signed_operation = sign_body.get("operation") 590 .expect("Response should contain operation") 591 .clone(); 592 assert!(signed_operation.get("sig").is_some()); 593 assert!(signed_operation.get("prev").is_some()); 594 let submit_res = client 595 .post(format!( 596 "{}/xrpc/com.atproto.identity.submitPlcOperation", 597 base_url().await 598 )) 599 .bearer_auth(&token) 600 .json(&json!({ "operation": signed_operation })) 601 .send() 602 .await 603 .expect("Submit failed"); 604 let submit_status = submit_res.status(); 605 let submit_body: Value = submit_res.json().await.unwrap_or(json!({})); 606 assert_eq!( 607 submit_status, 608 StatusCode::OK, 609 "Full sign and submit flow should succeed. Response: {:?}", 610 submit_body 611 ); 612} 613 614#[tokio::test] 615async fn test_cross_pds_migration_with_records() { 616 let client = client(); 617 let (token, did) = create_account_and_login(&client).await; 618 let key_bytes = get_user_signing_key(&did).await 619 .expect("Failed to get user signing key"); 620 let signing_key = SigningKey::from_slice(&key_bytes) 621 .expect("Failed to create signing key"); 622 let handle = get_user_handle(&did).await 623 .expect("Failed to get user handle"); 624 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 625 let pds_endpoint = format!("https://{}", hostname); 626 let post_payload = json!({ 627 "repo": did, 628 "collection": "app.bsky.feed.post", 629 "record": { 630 "$type": "app.bsky.feed.post", 631 "text": "Test post before migration", 632 "createdAt": chrono::Utc::now().to_rfc3339(), 633 } 634 }); 635 let create_res = client 636 .post(format!( 637 "{}/xrpc/com.atproto.repo.createRecord", 638 base_url().await 639 )) 640 .bearer_auth(&token) 641 .json(&post_payload) 642 .send() 643 .await 644 .expect("Failed to create post"); 645 assert_eq!(create_res.status(), StatusCode::OK); 646 let create_body: Value = create_res.json().await.unwrap(); 647 let original_uri = create_body["uri"].as_str().unwrap().to_string(); 648 let export_res = client 649 .get(format!( 650 "{}/xrpc/com.atproto.sync.getRepo?did={}", 651 base_url().await, 652 did 653 )) 654 .send() 655 .await 656 .expect("Export failed"); 657 assert_eq!(export_res.status(), StatusCode::OK); 658 let car_bytes = export_res.bytes().await.unwrap(); 659 assert!(car_bytes.len() > 100, "CAR file should have meaningful content"); 660 let mock_server = MockServer::start().await; 661 let did_encoded = urlencoding::encode(&did); 662 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 663 Mock::given(method("GET")) 664 .and(path(format!("/{}", did_encoded))) 665 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 666 .mount(&mock_server) 667 .await; 668 unsafe { 669 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 670 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 671 } 672 let import_res = client 673 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 674 .bearer_auth(&token) 675 .header("Content-Type", "application/vnd.ipld.car") 676 .body(car_bytes.to_vec()) 677 .send() 678 .await 679 .expect("Import failed"); 680 let import_status = import_res.status(); 681 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 682 unsafe { 683 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 684 } 685 assert_eq!( 686 import_status, 687 StatusCode::OK, 688 "Import with valid DID document should succeed. Response: {:?}", 689 import_body 690 ); 691 let get_record_res = client 692 .get(format!( 693 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}", 694 base_url().await, 695 did, 696 original_uri.split('/').last().unwrap() 697 )) 698 .send() 699 .await 700 .expect("Get record failed"); 701 assert_eq!( 702 get_record_res.status(), 703 StatusCode::OK, 704 "Record should be retrievable after import" 705 ); 706 let record_body: Value = get_record_res.json().await.unwrap(); 707 assert_eq!( 708 record_body["value"]["text"], 709 "Test post before migration", 710 "Record content should match" 711 ); 712} 713 714#[tokio::test] 715async fn test_migration_rejects_wrong_did_document() { 716 let client = client(); 717 let (token, did) = create_account_and_login(&client).await; 718 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng()); 719 let handle = get_user_handle(&did).await 720 .expect("Failed to get user handle"); 721 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 722 let pds_endpoint = format!("https://{}", hostname); 723 let export_res = client 724 .get(format!( 725 "{}/xrpc/com.atproto.sync.getRepo?did={}", 726 base_url().await, 727 did 728 )) 729 .send() 730 .await 731 .expect("Export failed"); 732 assert_eq!(export_res.status(), StatusCode::OK); 733 let car_bytes = export_res.bytes().await.unwrap(); 734 let mock_server = MockServer::start().await; 735 let did_encoded = urlencoding::encode(&did); 736 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint); 737 Mock::given(method("GET")) 738 .and(path(format!("/{}", did_encoded))) 739 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc)) 740 .mount(&mock_server) 741 .await; 742 unsafe { 743 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 744 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 745 } 746 let import_res = client 747 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 748 .bearer_auth(&token) 749 .header("Content-Type", "application/vnd.ipld.car") 750 .body(car_bytes.to_vec()) 751 .send() 752 .await 753 .expect("Import failed"); 754 let import_status = import_res.status(); 755 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 756 unsafe { 757 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 758 } 759 assert_eq!( 760 import_status, 761 StatusCode::BAD_REQUEST, 762 "Import with wrong DID document should fail. Response: {:?}", 763 import_body 764 ); 765 assert!( 766 import_body["error"] == "InvalidSignature" || 767 import_body["message"].as_str().unwrap_or("").contains("signature"), 768 "Error should mention signature verification failure" 769 ); 770} 771 772#[tokio::test] 773#[ignore = "requires exclusive env var access; run with: cargo test test_full_migration_flow_end_to_end -- --ignored --test-threads=1"] 774async fn test_full_migration_flow_end_to_end() { 775 let client = client(); 776 let (token, did) = create_account_and_login(&client).await; 777 let key_bytes = get_user_signing_key(&did).await 778 .expect("Failed to get user signing key"); 779 let signing_key = SigningKey::from_slice(&key_bytes) 780 .expect("Failed to create signing key"); 781 let handle = get_user_handle(&did).await 782 .expect("Failed to get user handle"); 783 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 784 let pds_endpoint = format!("https://{}", hostname); 785 let did_key = signing_key_to_did_key(&signing_key); 786 for i in 0..3 { 787 let post_payload = json!({ 788 "repo": did, 789 "collection": "app.bsky.feed.post", 790 "record": { 791 "$type": "app.bsky.feed.post", 792 "text": format!("Pre-migration post #{}", i), 793 "createdAt": chrono::Utc::now().to_rfc3339(), 794 } 795 }); 796 let res = client 797 .post(format!( 798 "{}/xrpc/com.atproto.repo.createRecord", 799 base_url().await 800 )) 801 .bearer_auth(&token) 802 .json(&post_payload) 803 .send() 804 .await 805 .expect("Failed to create post"); 806 assert_eq!(res.status(), StatusCode::OK); 807 } 808 let request_res = client 809 .post(format!( 810 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 811 base_url().await 812 )) 813 .bearer_auth(&token) 814 .send() 815 .await 816 .expect("Request failed"); 817 assert_eq!(request_res.status(), StatusCode::OK); 818 let plc_token = get_plc_token_from_db(&did).await 819 .expect("PLC token not found"); 820 let mock_server = MockServer::start().await; 821 let did_encoded = urlencoding::encode(&did); 822 let last_op = json!({ 823 "type": "plc_operation", 824 "rotationKeys": [did_key.clone()], 825 "verificationMethods": { "atproto": did_key.clone() }, 826 "alsoKnownAs": [format!("at://{}", handle)], 827 "services": { 828 "atproto_pds": { 829 "type": "AtprotoPersonalDataServer", 830 "endpoint": pds_endpoint.clone() 831 } 832 }, 833 "prev": null, 834 "sig": "initial_sig" 835 }); 836 Mock::given(method("GET")) 837 .and(path(format!("/{}/log/last", did_encoded))) 838 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 839 .mount(&mock_server) 840 .await; 841 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 842 Mock::given(method("GET")) 843 .and(path(format!("/{}", did_encoded))) 844 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 845 .mount(&mock_server) 846 .await; 847 Mock::given(method("POST")) 848 .and(path(format!("/{}", did_encoded))) 849 .respond_with(ResponseTemplate::new(200)) 850 .expect(1) 851 .mount(&mock_server) 852 .await; 853 unsafe { 854 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 855 } 856 let sign_res = client 857 .post(format!( 858 "{}/xrpc/com.atproto.identity.signPlcOperation", 859 base_url().await 860 )) 861 .bearer_auth(&token) 862 .json(&json!({ "token": plc_token })) 863 .send() 864 .await 865 .expect("Sign failed"); 866 assert_eq!(sign_res.status(), StatusCode::OK); 867 let sign_body: Value = sign_res.json().await.unwrap(); 868 let signed_op = sign_body.get("operation").unwrap().clone(); 869 let export_res = client 870 .get(format!( 871 "{}/xrpc/com.atproto.sync.getRepo?did={}", 872 base_url().await, 873 did 874 )) 875 .send() 876 .await 877 .expect("Export failed"); 878 assert_eq!(export_res.status(), StatusCode::OK); 879 let car_bytes = export_res.bytes().await.unwrap(); 880 let submit_res = client 881 .post(format!( 882 "{}/xrpc/com.atproto.identity.submitPlcOperation", 883 base_url().await 884 )) 885 .bearer_auth(&token) 886 .json(&json!({ "operation": signed_op })) 887 .send() 888 .await 889 .expect("Submit failed"); 890 assert_eq!(submit_res.status(), StatusCode::OK); 891 unsafe { 892 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 893 } 894 let import_res = client 895 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 896 .bearer_auth(&token) 897 .header("Content-Type", "application/vnd.ipld.car") 898 .body(car_bytes.to_vec()) 899 .send() 900 .await 901 .expect("Import failed"); 902 let import_status = import_res.status(); 903 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 904 unsafe { 905 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 906 } 907 assert_eq!( 908 import_status, 909 StatusCode::OK, 910 "Full migration flow should succeed. Response: {:?}", 911 import_body 912 ); 913 let list_res = client 914 .get(format!( 915 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post", 916 base_url().await, 917 did 918 )) 919 .send() 920 .await 921 .expect("List failed"); 922 assert_eq!(list_res.status(), StatusCode::OK); 923 let list_body: Value = list_res.json().await.unwrap(); 924 let records = list_body["records"].as_array() 925 .expect("Should have records array"); 926 assert!( 927 records.len() >= 1, 928 "Should have at least 1 record after migration, found {}", 929 records.len() 930 ); 931}