this repo has no description
1mod common; 2use common::*; 3use k256::ecdsa::SigningKey; 4use reqwest::StatusCode; 5use serde_json::{Value, json}; 6use sqlx::PgPool; 7use wiremock::matchers::{method, path}; 8use wiremock::{Mock, MockServer, ResponseTemplate}; 9 10fn encode_uvarint(mut x: u64) -> Vec<u8> { 11 let mut out = Vec::new(); 12 while x >= 0x80 { 13 out.push(((x as u8) & 0x7F) | 0x80); 14 x >>= 7; 15 } 16 out.push(x as u8); 17 out 18} 19 20fn signing_key_to_did_key(signing_key: &SigningKey) -> String { 21 let verifying_key = signing_key.verifying_key(); 22 let point = verifying_key.to_encoded_point(true); 23 let compressed_bytes = point.as_bytes(); 24 let mut prefixed = vec![0xe7, 0x01]; 25 prefixed.extend_from_slice(compressed_bytes); 26 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed); 27 format!("did:key:{}", encoded) 28} 29 30fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String { 31 let public_key = signing_key.verifying_key(); 32 let compressed = public_key.to_sec1_bytes(); 33 let mut buf = encode_uvarint(0xE7); 34 buf.extend_from_slice(&compressed); 35 multibase::encode(multibase::Base::Base58Btc, buf) 36} 37 38async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> { 39 let db_url = get_db_connection_string().await; 40 let pool = PgPool::connect(&db_url).await.ok()?; 41 let row = sqlx::query!( 42 r#" 43 SELECT k.key_bytes, k.encryption_version 44 FROM user_keys k 45 JOIN users u ON k.user_id = u.id 46 WHERE u.did = $1 47 "#, 48 did 49 ) 50 .fetch_optional(&pool) 51 .await 52 .ok()??; 53 tranquil_pds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok() 54} 55 56async fn get_plc_token_from_db(did: &str) -> Option<String> { 57 let db_url = get_db_connection_string().await; 58 let pool = PgPool::connect(&db_url).await.ok()?; 59 sqlx::query_scalar!( 60 r#" 61 SELECT t.token 62 FROM plc_operation_tokens t 63 JOIN users u ON t.user_id = u.id 64 WHERE u.did = $1 65 "#, 66 did 67 ) 68 .fetch_optional(&pool) 69 .await 70 .ok()? 71} 72 73async fn get_user_handle(did: &str) -> Option<String> { 74 let db_url = get_db_connection_string().await; 75 let pool = PgPool::connect(&db_url).await.ok()?; 76 sqlx::query_scalar!(r#"SELECT handle FROM users WHERE did = $1"#, did) 77 .fetch_optional(&pool) 78 .await 79 .ok()? 80} 81 82fn create_mock_last_op( 83 _did: &str, 84 handle: &str, 85 signing_key: &SigningKey, 86 pds_endpoint: &str, 87) -> Value { 88 let did_key = signing_key_to_did_key(signing_key); 89 json!({ 90 "type": "plc_operation", 91 "rotationKeys": [did_key.clone()], 92 "verificationMethods": { 93 "atproto": did_key 94 }, 95 "alsoKnownAs": [format!("at://{}", handle)], 96 "services": { 97 "atproto_pds": { 98 "type": "AtprotoPersonalDataServer", 99 "endpoint": pds_endpoint 100 } 101 }, 102 "prev": null, 103 "sig": "mock_signature_for_testing" 104 }) 105} 106 107fn create_did_document( 108 did: &str, 109 handle: &str, 110 signing_key: &SigningKey, 111 pds_endpoint: &str, 112) -> Value { 113 let multikey = get_multikey_from_signing_key(signing_key); 114 json!({ 115 "@context": [ 116 "https://www.w3.org/ns/did/v1", 117 "https://w3id.org/security/multikey/v1" 118 ], 119 "id": did, 120 "alsoKnownAs": [format!("at://{}", handle)], 121 "verificationMethod": [{ 122 "id": format!("{}#atproto", did), 123 "type": "Multikey", 124 "controller": did, 125 "publicKeyMultibase": multikey 126 }], 127 "service": [{ 128 "id": "#atproto_pds", 129 "type": "AtprotoPersonalDataServer", 130 "serviceEndpoint": pds_endpoint 131 }] 132 }) 133} 134 135async fn setup_mock_plc_for_sign( 136 did: &str, 137 handle: &str, 138 signing_key: &SigningKey, 139 pds_endpoint: &str, 140) -> MockServer { 141 let mock_server = MockServer::start().await; 142 let did_encoded = urlencoding::encode(did); 143 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint); 144 Mock::given(method("GET")) 145 .and(path(format!("/{}/log/last", did_encoded))) 146 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 147 .mount(&mock_server) 148 .await; 149 mock_server 150} 151 152async fn setup_mock_plc_for_submit( 153 did: &str, 154 handle: &str, 155 signing_key: &SigningKey, 156 pds_endpoint: &str, 157) -> MockServer { 158 let mock_server = MockServer::start().await; 159 let did_encoded = urlencoding::encode(did); 160 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint); 161 Mock::given(method("GET")) 162 .and(path(format!("/{}", did_encoded))) 163 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone())) 164 .mount(&mock_server) 165 .await; 166 Mock::given(method("POST")) 167 .and(path(format!("/{}", did_encoded))) 168 .respond_with(ResponseTemplate::new(200)) 169 .mount(&mock_server) 170 .await; 171 mock_server 172} 173 174#[tokio::test] 175#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"] 176async fn test_full_plc_operation_flow() { 177 let client = client(); 178 let (token, did) = create_account_and_login(&client).await; 179 let key_bytes = get_user_signing_key(&did) 180 .await 181 .expect("Failed to get user signing key"); 182 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 183 let handle = get_user_handle(&did) 184 .await 185 .expect("Failed to get user handle"); 186 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 187 let pds_endpoint = format!("https://{}", hostname); 188 let request_res = client 189 .post(format!( 190 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 191 base_url().await 192 )) 193 .bearer_auth(&token) 194 .send() 195 .await 196 .expect("Request failed"); 197 assert_eq!(request_res.status(), StatusCode::OK); 198 let plc_token = get_plc_token_from_db(&did) 199 .await 200 .expect("PLC token not found in database"); 201 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 202 unsafe { 203 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 204 } 205 let sign_res = client 206 .post(format!( 207 "{}/xrpc/com.atproto.identity.signPlcOperation", 208 base_url().await 209 )) 210 .bearer_auth(&token) 211 .json(&json!({ 212 "token": plc_token 213 })) 214 .send() 215 .await 216 .expect("Sign request failed"); 217 let sign_status = sign_res.status(); 218 let sign_body: Value = sign_res.json().await.unwrap_or(json!({})); 219 assert_eq!( 220 sign_status, 221 StatusCode::OK, 222 "Sign PLC operation should succeed. Response: {:?}", 223 sign_body 224 ); 225 let operation = sign_body 226 .get("operation") 227 .expect("Response should contain operation"); 228 assert!(operation.get("sig").is_some(), "Operation should be signed"); 229 assert_eq!( 230 operation.get("type").and_then(|v| v.as_str()), 231 Some("plc_operation") 232 ); 233 assert!( 234 operation.get("prev").is_some(), 235 "Operation should have prev reference" 236 ); 237} 238 239#[tokio::test] 240#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_consumes_token -- --ignored --test-threads=1"] 241async fn test_sign_plc_operation_consumes_token() { 242 let client = client(); 243 let (token, did) = create_account_and_login(&client).await; 244 let key_bytes = get_user_signing_key(&did) 245 .await 246 .expect("Failed to get user signing key"); 247 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 248 let handle = get_user_handle(&did) 249 .await 250 .expect("Failed to get user handle"); 251 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 252 let pds_endpoint = format!("https://{}", hostname); 253 let request_res = client 254 .post(format!( 255 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 256 base_url().await 257 )) 258 .bearer_auth(&token) 259 .send() 260 .await 261 .expect("Request failed"); 262 assert_eq!(request_res.status(), StatusCode::OK); 263 let plc_token = get_plc_token_from_db(&did) 264 .await 265 .expect("PLC token not found in database"); 266 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 267 unsafe { 268 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 269 } 270 let sign_res = client 271 .post(format!( 272 "{}/xrpc/com.atproto.identity.signPlcOperation", 273 base_url().await 274 )) 275 .bearer_auth(&token) 276 .json(&json!({ 277 "token": plc_token 278 })) 279 .send() 280 .await 281 .expect("Sign request failed"); 282 assert_eq!(sign_res.status(), StatusCode::OK); 283 let sign_res_2 = client 284 .post(format!( 285 "{}/xrpc/com.atproto.identity.signPlcOperation", 286 base_url().await 287 )) 288 .bearer_auth(&token) 289 .json(&json!({ 290 "token": plc_token 291 })) 292 .send() 293 .await 294 .expect("Second sign request failed"); 295 assert_eq!( 296 sign_res_2.status(), 297 StatusCode::BAD_REQUEST, 298 "Using the same token twice should fail" 299 ); 300 let body: Value = sign_res_2.json().await.unwrap(); 301 assert!( 302 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken", 303 "Error should indicate invalid/expired token" 304 ); 305} 306 307#[tokio::test] 308#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_with_custom_fields -- --ignored --test-threads=1"] 309async fn test_sign_plc_operation_with_custom_fields() { 310 let client = client(); 311 let (token, did) = create_account_and_login(&client).await; 312 let key_bytes = get_user_signing_key(&did) 313 .await 314 .expect("Failed to get user signing key"); 315 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 316 let handle = get_user_handle(&did) 317 .await 318 .expect("Failed to get user handle"); 319 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 320 let pds_endpoint = format!("https://{}", hostname); 321 let request_res = client 322 .post(format!( 323 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 324 base_url().await 325 )) 326 .bearer_auth(&token) 327 .send() 328 .await 329 .expect("Request failed"); 330 assert_eq!(request_res.status(), StatusCode::OK); 331 let plc_token = get_plc_token_from_db(&did) 332 .await 333 .expect("PLC token not found in database"); 334 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 335 unsafe { 336 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 337 } 338 let did_key = signing_key_to_did_key(&signing_key); 339 let sign_res = client 340 .post(format!( 341 "{}/xrpc/com.atproto.identity.signPlcOperation", 342 base_url().await 343 )) 344 .bearer_auth(&token) 345 .json(&json!({ 346 "token": plc_token, 347 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"], 348 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"] 349 })) 350 .send() 351 .await 352 .expect("Sign request failed"); 353 let sign_status = sign_res.status(); 354 let sign_body: Value = sign_res.json().await.unwrap_or(json!({})); 355 assert_eq!( 356 sign_status, 357 StatusCode::OK, 358 "Sign with custom fields should succeed. Response: {:?}", 359 sign_body 360 ); 361 let operation = sign_body.get("operation").expect("Should have operation"); 362 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array()); 363 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array()); 364 assert!(also_known_as.is_some(), "Should have alsoKnownAs"); 365 assert!(rotation_keys.is_some(), "Should have rotationKeys"); 366 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases"); 367 assert_eq!( 368 rotation_keys.unwrap().len(), 369 2, 370 "Should have 2 rotation keys" 371 ); 372} 373 374#[tokio::test] 375#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"] 376async fn test_submit_plc_operation_success() { 377 let client = client(); 378 let (token, did) = create_account_and_login(&client).await; 379 let key_bytes = get_user_signing_key(&did) 380 .await 381 .expect("Failed to get user signing key"); 382 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 383 let handle = get_user_handle(&did) 384 .await 385 .expect("Failed to get user handle"); 386 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 387 let pds_endpoint = format!("https://{}", hostname); 388 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 389 unsafe { 390 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 391 } 392 let did_key = signing_key_to_did_key(&signing_key); 393 let operation = json!({ 394 "type": "plc_operation", 395 "rotationKeys": [did_key.clone()], 396 "verificationMethods": { 397 "atproto": did_key.clone() 398 }, 399 "alsoKnownAs": [format!("at://{}", handle)], 400 "services": { 401 "atproto_pds": { 402 "type": "AtprotoPersonalDataServer", 403 "endpoint": pds_endpoint 404 } 405 }, 406 "prev": "bafyreiabc123", 407 "sig": "test_signature_base64" 408 }); 409 let submit_res = client 410 .post(format!( 411 "{}/xrpc/com.atproto.identity.submitPlcOperation", 412 base_url().await 413 )) 414 .bearer_auth(&token) 415 .json(&json!({ "operation": operation })) 416 .send() 417 .await 418 .expect("Submit request failed"); 419 let submit_status = submit_res.status(); 420 let submit_body: Value = submit_res.json().await.unwrap_or(json!({})); 421 assert_eq!( 422 submit_status, 423 StatusCode::OK, 424 "Submit PLC operation should succeed. Response: {:?}", 425 submit_body 426 ); 427} 428 429#[tokio::test] 430async fn test_submit_plc_operation_wrong_endpoint_rejected() { 431 let client = client(); 432 let (token, did) = create_account_and_login(&client).await; 433 let key_bytes = get_user_signing_key(&did) 434 .await 435 .expect("Failed to get user signing key"); 436 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 437 let handle = get_user_handle(&did) 438 .await 439 .expect("Failed to get user handle"); 440 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 441 let pds_endpoint = format!("https://{}", hostname); 442 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 443 unsafe { 444 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 445 } 446 let did_key = signing_key_to_did_key(&signing_key); 447 let operation = json!({ 448 "type": "plc_operation", 449 "rotationKeys": [did_key.clone()], 450 "verificationMethods": { 451 "atproto": did_key.clone() 452 }, 453 "alsoKnownAs": [format!("at://{}", handle)], 454 "services": { 455 "atproto_pds": { 456 "type": "AtprotoPersonalDataServer", 457 "endpoint": "https://wrong-pds.example.com" 458 } 459 }, 460 "prev": "bafyreiabc123", 461 "sig": "test_signature_base64" 462 }); 463 let submit_res = client 464 .post(format!( 465 "{}/xrpc/com.atproto.identity.submitPlcOperation", 466 base_url().await 467 )) 468 .bearer_auth(&token) 469 .json(&json!({ "operation": operation })) 470 .send() 471 .await 472 .expect("Submit request failed"); 473 assert_eq!( 474 submit_res.status(), 475 StatusCode::BAD_REQUEST, 476 "Submit with wrong endpoint should fail" 477 ); 478 let body: Value = submit_res.json().await.unwrap(); 479 assert_eq!(body["error"], "InvalidRequest"); 480} 481 482#[tokio::test] 483async fn test_submit_plc_operation_wrong_signing_key_rejected() { 484 let client = client(); 485 let (token, did) = create_account_and_login(&client).await; 486 let key_bytes = get_user_signing_key(&did) 487 .await 488 .expect("Failed to get user signing key"); 489 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 490 let handle = get_user_handle(&did) 491 .await 492 .expect("Failed to get user handle"); 493 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 494 let pds_endpoint = format!("https://{}", hostname); 495 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 496 unsafe { 497 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 498 } 499 let wrong_key = SigningKey::random(&mut rand::thread_rng()); 500 let wrong_did_key = signing_key_to_did_key(&wrong_key); 501 let correct_did_key = signing_key_to_did_key(&signing_key); 502 let operation = json!({ 503 "type": "plc_operation", 504 "rotationKeys": [correct_did_key.clone()], 505 "verificationMethods": { 506 "atproto": wrong_did_key 507 }, 508 "alsoKnownAs": [format!("at://{}", handle)], 509 "services": { 510 "atproto_pds": { 511 "type": "AtprotoPersonalDataServer", 512 "endpoint": pds_endpoint 513 } 514 }, 515 "prev": "bafyreiabc123", 516 "sig": "test_signature_base64" 517 }); 518 let submit_res = client 519 .post(format!( 520 "{}/xrpc/com.atproto.identity.submitPlcOperation", 521 base_url().await 522 )) 523 .bearer_auth(&token) 524 .json(&json!({ "operation": operation })) 525 .send() 526 .await 527 .expect("Submit request failed"); 528 assert_eq!( 529 submit_res.status(), 530 StatusCode::BAD_REQUEST, 531 "Submit with wrong signing key should fail" 532 ); 533 let body: Value = submit_res.json().await.unwrap(); 534 assert_eq!(body["error"], "InvalidRequest"); 535} 536 537#[tokio::test] 538async fn test_full_sign_and_submit_flow() { 539 let client = client(); 540 let (token, did) = create_account_and_login(&client).await; 541 let key_bytes = get_user_signing_key(&did) 542 .await 543 .expect("Failed to get user signing key"); 544 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 545 let handle = get_user_handle(&did) 546 .await 547 .expect("Failed to get user handle"); 548 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 549 let pds_endpoint = format!("https://{}", hostname); 550 let request_res = client 551 .post(format!( 552 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 553 base_url().await 554 )) 555 .bearer_auth(&token) 556 .send() 557 .await 558 .expect("Request failed"); 559 assert_eq!(request_res.status(), StatusCode::OK); 560 let plc_token = get_plc_token_from_db(&did) 561 .await 562 .expect("PLC token not found"); 563 let mock_server = MockServer::start().await; 564 let did_encoded = urlencoding::encode(&did); 565 let did_key = signing_key_to_did_key(&signing_key); 566 let last_op = json!({ 567 "type": "plc_operation", 568 "rotationKeys": [did_key.clone()], 569 "verificationMethods": { 570 "atproto": did_key.clone() 571 }, 572 "alsoKnownAs": [format!("at://{}", handle)], 573 "services": { 574 "atproto_pds": { 575 "type": "AtprotoPersonalDataServer", 576 "endpoint": pds_endpoint.clone() 577 } 578 }, 579 "prev": null, 580 "sig": "initial_sig" 581 }); 582 Mock::given(method("GET")) 583 .and(path(format!("/{}/log/last", did_encoded))) 584 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 585 .mount(&mock_server) 586 .await; 587 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 588 Mock::given(method("GET")) 589 .and(path(format!("/{}", did_encoded))) 590 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 591 .mount(&mock_server) 592 .await; 593 Mock::given(method("POST")) 594 .and(path(format!("/{}", did_encoded))) 595 .respond_with(ResponseTemplate::new(200)) 596 .expect(1) 597 .mount(&mock_server) 598 .await; 599 unsafe { 600 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 601 } 602 let sign_res = client 603 .post(format!( 604 "{}/xrpc/com.atproto.identity.signPlcOperation", 605 base_url().await 606 )) 607 .bearer_auth(&token) 608 .json(&json!({ "token": plc_token })) 609 .send() 610 .await 611 .expect("Sign failed"); 612 assert_eq!(sign_res.status(), StatusCode::OK); 613 let sign_body: Value = sign_res.json().await.unwrap(); 614 let signed_operation = sign_body 615 .get("operation") 616 .expect("Response should contain operation") 617 .clone(); 618 assert!(signed_operation.get("sig").is_some()); 619 assert!(signed_operation.get("prev").is_some()); 620 let submit_res = client 621 .post(format!( 622 "{}/xrpc/com.atproto.identity.submitPlcOperation", 623 base_url().await 624 )) 625 .bearer_auth(&token) 626 .json(&json!({ "operation": signed_operation })) 627 .send() 628 .await 629 .expect("Submit failed"); 630 let submit_status = submit_res.status(); 631 let submit_body: Value = submit_res.json().await.unwrap_or(json!({})); 632 assert_eq!( 633 submit_status, 634 StatusCode::OK, 635 "Full sign and submit flow should succeed. Response: {:?}", 636 submit_body 637 ); 638} 639 640#[tokio::test] 641#[ignore = "requires exclusive env var access; run with: cargo test test_cross_pds_migration_with_records -- --ignored --test-threads=1"] 642async fn test_cross_pds_migration_with_records() { 643 let client = client(); 644 let (token, did) = create_account_and_login(&client).await; 645 let key_bytes = get_user_signing_key(&did) 646 .await 647 .expect("Failed to get user signing key"); 648 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 649 let handle = get_user_handle(&did) 650 .await 651 .expect("Failed to get user handle"); 652 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 653 let pds_endpoint = format!("https://{}", hostname); 654 let post_payload = json!({ 655 "repo": did, 656 "collection": "app.bsky.feed.post", 657 "record": { 658 "$type": "app.bsky.feed.post", 659 "text": "Test post before migration", 660 "createdAt": chrono::Utc::now().to_rfc3339(), 661 } 662 }); 663 let create_res = client 664 .post(format!( 665 "{}/xrpc/com.atproto.repo.createRecord", 666 base_url().await 667 )) 668 .bearer_auth(&token) 669 .json(&post_payload) 670 .send() 671 .await 672 .expect("Failed to create post"); 673 assert_eq!(create_res.status(), StatusCode::OK); 674 let create_body: Value = create_res.json().await.unwrap(); 675 let original_uri = create_body["uri"].as_str().unwrap().to_string(); 676 let export_res = client 677 .get(format!( 678 "{}/xrpc/com.atproto.sync.getRepo?did={}", 679 base_url().await, 680 did 681 )) 682 .send() 683 .await 684 .expect("Export failed"); 685 assert_eq!(export_res.status(), StatusCode::OK); 686 let car_bytes = export_res.bytes().await.unwrap(); 687 assert!( 688 car_bytes.len() > 100, 689 "CAR file should have meaningful content" 690 ); 691 let mock_server = MockServer::start().await; 692 let did_encoded = urlencoding::encode(&did); 693 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 694 Mock::given(method("GET")) 695 .and(path(format!("/{}", did_encoded))) 696 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 697 .mount(&mock_server) 698 .await; 699 unsafe { 700 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 701 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 702 } 703 let import_res = client 704 .post(format!( 705 "{}/xrpc/com.atproto.repo.importRepo", 706 base_url().await 707 )) 708 .bearer_auth(&token) 709 .header("Content-Type", "application/vnd.ipld.car") 710 .body(car_bytes.to_vec()) 711 .send() 712 .await 713 .expect("Import failed"); 714 let import_status = import_res.status(); 715 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 716 unsafe { 717 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 718 } 719 assert_eq!( 720 import_status, 721 StatusCode::OK, 722 "Import with valid DID document should succeed. Response: {:?}", 723 import_body 724 ); 725 let get_record_res = client 726 .get(format!( 727 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}", 728 base_url().await, 729 did, 730 original_uri.split('/').next_back().unwrap() 731 )) 732 .send() 733 .await 734 .expect("Get record failed"); 735 assert_eq!( 736 get_record_res.status(), 737 StatusCode::OK, 738 "Record should be retrievable after import" 739 ); 740 let record_body: Value = get_record_res.json().await.unwrap(); 741 assert_eq!( 742 record_body["value"]["text"], "Test post before migration", 743 "Record content should match" 744 ); 745} 746 747#[tokio::test] 748async fn test_migration_rejects_wrong_did_document() { 749 let client = client(); 750 let (token, did) = create_account_and_login(&client).await; 751 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng()); 752 let handle = get_user_handle(&did) 753 .await 754 .expect("Failed to get user handle"); 755 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 756 let pds_endpoint = format!("https://{}", hostname); 757 let export_res = client 758 .get(format!( 759 "{}/xrpc/com.atproto.sync.getRepo?did={}", 760 base_url().await, 761 did 762 )) 763 .send() 764 .await 765 .expect("Export failed"); 766 assert_eq!(export_res.status(), StatusCode::OK); 767 let car_bytes = export_res.bytes().await.unwrap(); 768 let mock_server = MockServer::start().await; 769 let did_encoded = urlencoding::encode(&did); 770 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint); 771 Mock::given(method("GET")) 772 .and(path(format!("/{}", did_encoded))) 773 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc)) 774 .mount(&mock_server) 775 .await; 776 unsafe { 777 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 778 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 779 } 780 let import_res = client 781 .post(format!( 782 "{}/xrpc/com.atproto.repo.importRepo", 783 base_url().await 784 )) 785 .bearer_auth(&token) 786 .header("Content-Type", "application/vnd.ipld.car") 787 .body(car_bytes.to_vec()) 788 .send() 789 .await 790 .expect("Import failed"); 791 let import_status = import_res.status(); 792 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 793 unsafe { 794 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 795 } 796 assert_eq!( 797 import_status, 798 StatusCode::BAD_REQUEST, 799 "Import with wrong DID document should fail. Response: {:?}", 800 import_body 801 ); 802 assert!( 803 import_body["error"] == "InvalidSignature" 804 || import_body["message"] 805 .as_str() 806 .unwrap_or("") 807 .contains("signature"), 808 "Error should mention signature verification failure" 809 ); 810} 811 812#[tokio::test] 813#[ignore = "requires exclusive env var access; run with: cargo test test_full_migration_flow_end_to_end -- --ignored --test-threads=1"] 814async fn test_full_migration_flow_end_to_end() { 815 let client = client(); 816 let (token, did) = create_account_and_login(&client).await; 817 let key_bytes = get_user_signing_key(&did) 818 .await 819 .expect("Failed to get user signing key"); 820 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key"); 821 let handle = get_user_handle(&did) 822 .await 823 .expect("Failed to get user handle"); 824 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 825 let pds_endpoint = format!("https://{}", hostname); 826 let did_key = signing_key_to_did_key(&signing_key); 827 for i in 0..3 { 828 let post_payload = json!({ 829 "repo": did, 830 "collection": "app.bsky.feed.post", 831 "record": { 832 "$type": "app.bsky.feed.post", 833 "text": format!("Pre-migration post #{}", i), 834 "createdAt": chrono::Utc::now().to_rfc3339(), 835 } 836 }); 837 let res = client 838 .post(format!( 839 "{}/xrpc/com.atproto.repo.createRecord", 840 base_url().await 841 )) 842 .bearer_auth(&token) 843 .json(&post_payload) 844 .send() 845 .await 846 .expect("Failed to create post"); 847 assert_eq!(res.status(), StatusCode::OK); 848 } 849 let request_res = client 850 .post(format!( 851 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 852 base_url().await 853 )) 854 .bearer_auth(&token) 855 .send() 856 .await 857 .expect("Request failed"); 858 assert_eq!(request_res.status(), StatusCode::OK); 859 let plc_token = get_plc_token_from_db(&did) 860 .await 861 .expect("PLC token not found"); 862 let mock_server = MockServer::start().await; 863 let did_encoded = urlencoding::encode(&did); 864 let last_op = json!({ 865 "type": "plc_operation", 866 "rotationKeys": [did_key.clone()], 867 "verificationMethods": { "atproto": did_key.clone() }, 868 "alsoKnownAs": [format!("at://{}", handle)], 869 "services": { 870 "atproto_pds": { 871 "type": "AtprotoPersonalDataServer", 872 "endpoint": pds_endpoint.clone() 873 } 874 }, 875 "prev": null, 876 "sig": "initial_sig" 877 }); 878 Mock::given(method("GET")) 879 .and(path(format!("/{}/log/last", did_encoded))) 880 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 881 .mount(&mock_server) 882 .await; 883 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 884 Mock::given(method("GET")) 885 .and(path(format!("/{}", did_encoded))) 886 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 887 .mount(&mock_server) 888 .await; 889 Mock::given(method("POST")) 890 .and(path(format!("/{}", did_encoded))) 891 .respond_with(ResponseTemplate::new(200)) 892 .expect(1) 893 .mount(&mock_server) 894 .await; 895 unsafe { 896 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 897 } 898 let sign_res = client 899 .post(format!( 900 "{}/xrpc/com.atproto.identity.signPlcOperation", 901 base_url().await 902 )) 903 .bearer_auth(&token) 904 .json(&json!({ "token": plc_token })) 905 .send() 906 .await 907 .expect("Sign failed"); 908 assert_eq!(sign_res.status(), StatusCode::OK); 909 let sign_body: Value = sign_res.json().await.unwrap(); 910 let signed_op = sign_body.get("operation").unwrap().clone(); 911 let export_res = client 912 .get(format!( 913 "{}/xrpc/com.atproto.sync.getRepo?did={}", 914 base_url().await, 915 did 916 )) 917 .send() 918 .await 919 .expect("Export failed"); 920 assert_eq!(export_res.status(), StatusCode::OK); 921 let car_bytes = export_res.bytes().await.unwrap(); 922 let submit_res = client 923 .post(format!( 924 "{}/xrpc/com.atproto.identity.submitPlcOperation", 925 base_url().await 926 )) 927 .bearer_auth(&token) 928 .json(&json!({ "operation": signed_op })) 929 .send() 930 .await 931 .expect("Submit failed"); 932 assert_eq!(submit_res.status(), StatusCode::OK); 933 unsafe { 934 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 935 } 936 let import_res = client 937 .post(format!( 938 "{}/xrpc/com.atproto.repo.importRepo", 939 base_url().await 940 )) 941 .bearer_auth(&token) 942 .header("Content-Type", "application/vnd.ipld.car") 943 .body(car_bytes.to_vec()) 944 .send() 945 .await 946 .expect("Import failed"); 947 let import_status = import_res.status(); 948 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 949 unsafe { 950 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 951 } 952 assert_eq!( 953 import_status, 954 StatusCode::OK, 955 "Full migration flow should succeed. Response: {:?}", 956 import_body 957 ); 958 let list_res = client 959 .get(format!( 960 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post", 961 base_url().await, 962 did 963 )) 964 .send() 965 .await 966 .expect("List failed"); 967 assert_eq!(list_res.status(), StatusCode::OK); 968 let list_body: Value = list_res.json().await.unwrap(); 969 let records = list_body["records"] 970 .as_array() 971 .expect("Should have records array"); 972 assert!( 973 !records.is_empty(), 974 "Should have at least 1 record after migration, found {}", 975 records.len() 976 ); 977}