this repo has no description
1mod common; 2use common::*; 3use k256::ecdsa::SigningKey; 4use reqwest::StatusCode; 5use serde_json::{json, Value}; 6use sqlx::PgPool; 7use wiremock::matchers::{method, path}; 8use wiremock::{Mock, MockServer, ResponseTemplate}; 9fn encode_uvarint(mut x: u64) -> Vec<u8> { 10 let mut out = Vec::new(); 11 while x >= 0x80 { 12 out.push(((x as u8) & 0x7F) | 0x80); 13 x >>= 7; 14 } 15 out.push(x as u8); 16 out 17} 18fn signing_key_to_did_key(signing_key: &SigningKey) -> String { 19 let verifying_key = signing_key.verifying_key(); 20 let point = verifying_key.to_encoded_point(true); 21 let compressed_bytes = point.as_bytes(); 22 let mut prefixed = vec![0xe7, 0x01]; 23 prefixed.extend_from_slice(compressed_bytes); 24 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed); 25 format!("did:key:{}", encoded) 26} 27fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String { 28 let public_key = signing_key.verifying_key(); 29 let compressed = public_key.to_sec1_bytes(); 30 let mut buf = encode_uvarint(0xE7); 31 buf.extend_from_slice(&compressed); 32 multibase::encode(multibase::Base::Base58Btc, buf) 33} 34async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> { 35 let db_url = get_db_connection_string().await; 36 let pool = PgPool::connect(&db_url).await.ok()?; 37 let row = sqlx::query!( 38 r#" 39 SELECT k.key_bytes, k.encryption_version 40 FROM user_keys k 41 JOIN users u ON k.user_id = u.id 42 WHERE u.did = $1 43 "#, 44 did 45 ) 46 .fetch_optional(&pool) 47 .await 48 .ok()??; 49 bspds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok() 50} 51async fn get_plc_token_from_db(did: &str) -> Option<String> { 52 let db_url = get_db_connection_string().await; 53 let pool = PgPool::connect(&db_url).await.ok()?; 54 sqlx::query_scalar!( 55 r#" 56 SELECT t.token 57 FROM plc_operation_tokens t 58 JOIN users u ON t.user_id = u.id 59 WHERE u.did = $1 60 "#, 61 did 62 ) 63 .fetch_optional(&pool) 64 .await 65 .ok()? 66} 67async fn get_user_handle(did: &str) -> Option<String> { 68 let db_url = get_db_connection_string().await; 69 let pool = PgPool::connect(&db_url).await.ok()?; 70 sqlx::query_scalar!( 71 r#"SELECT handle FROM users WHERE did = $1"#, 72 did 73 ) 74 .fetch_optional(&pool) 75 .await 76 .ok()? 77} 78fn create_mock_last_op( 79 _did: &str, 80 handle: &str, 81 signing_key: &SigningKey, 82 pds_endpoint: &str, 83) -> Value { 84 let did_key = signing_key_to_did_key(signing_key); 85 json!({ 86 "type": "plc_operation", 87 "rotationKeys": [did_key.clone()], 88 "verificationMethods": { 89 "atproto": did_key 90 }, 91 "alsoKnownAs": [format!("at://{}", handle)], 92 "services": { 93 "atproto_pds": { 94 "type": "AtprotoPersonalDataServer", 95 "endpoint": pds_endpoint 96 } 97 }, 98 "prev": null, 99 "sig": "mock_signature_for_testing" 100 }) 101} 102fn create_did_document(did: &str, handle: &str, signing_key: &SigningKey, pds_endpoint: &str) -> Value { 103 let multikey = get_multikey_from_signing_key(signing_key); 104 json!({ 105 "@context": [ 106 "https://www.w3.org/ns/did/v1", 107 "https://w3id.org/security/multikey/v1" 108 ], 109 "id": did, 110 "alsoKnownAs": [format!("at://{}", handle)], 111 "verificationMethod": [{ 112 "id": format!("{}#atproto", did), 113 "type": "Multikey", 114 "controller": did, 115 "publicKeyMultibase": multikey 116 }], 117 "service": [{ 118 "id": "#atproto_pds", 119 "type": "AtprotoPersonalDataServer", 120 "serviceEndpoint": pds_endpoint 121 }] 122 }) 123} 124async fn setup_mock_plc_for_sign( 125 did: &str, 126 handle: &str, 127 signing_key: &SigningKey, 128 pds_endpoint: &str, 129) -> MockServer { 130 let mock_server = MockServer::start().await; 131 let did_encoded = urlencoding::encode(did); 132 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint); 133 Mock::given(method("GET")) 134 .and(path(format!("/{}/log/last", did_encoded))) 135 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 136 .mount(&mock_server) 137 .await; 138 mock_server 139} 140async fn setup_mock_plc_for_submit( 141 did: &str, 142 handle: &str, 143 signing_key: &SigningKey, 144 pds_endpoint: &str, 145) -> MockServer { 146 let mock_server = MockServer::start().await; 147 let did_encoded = urlencoding::encode(did); 148 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint); 149 Mock::given(method("GET")) 150 .and(path(format!("/{}", did_encoded))) 151 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone())) 152 .mount(&mock_server) 153 .await; 154 Mock::given(method("POST")) 155 .and(path(format!("/{}", did_encoded))) 156 .respond_with(ResponseTemplate::new(200)) 157 .mount(&mock_server) 158 .await; 159 mock_server 160} 161#[tokio::test] 162#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"] 163async fn test_full_plc_operation_flow() { 164 let client = client(); 165 let (token, did) = create_account_and_login(&client).await; 166 let key_bytes = get_user_signing_key(&did).await 167 .expect("Failed to get user signing key"); 168 let signing_key = SigningKey::from_slice(&key_bytes) 169 .expect("Failed to create signing key"); 170 let handle = get_user_handle(&did).await 171 .expect("Failed to get user handle"); 172 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 173 let pds_endpoint = format!("https://{}", hostname); 174 let request_res = client 175 .post(format!( 176 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 177 base_url().await 178 )) 179 .bearer_auth(&token) 180 .send() 181 .await 182 .expect("Request failed"); 183 assert_eq!(request_res.status(), StatusCode::OK); 184 let plc_token = get_plc_token_from_db(&did).await 185 .expect("PLC token not found in database"); 186 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 187 unsafe { 188 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 189 } 190 let sign_res = client 191 .post(format!( 192 "{}/xrpc/com.atproto.identity.signPlcOperation", 193 base_url().await 194 )) 195 .bearer_auth(&token) 196 .json(&json!({ 197 "token": plc_token 198 })) 199 .send() 200 .await 201 .expect("Sign request failed"); 202 let sign_status = sign_res.status(); 203 let sign_body: Value = sign_res.json().await.unwrap_or(json!({})); 204 assert_eq!( 205 sign_status, 206 StatusCode::OK, 207 "Sign PLC operation should succeed. Response: {:?}", 208 sign_body 209 ); 210 let operation = sign_body.get("operation") 211 .expect("Response should contain operation"); 212 assert!(operation.get("sig").is_some(), "Operation should be signed"); 213 assert_eq!(operation.get("type").and_then(|v| v.as_str()), Some("plc_operation")); 214 assert!(operation.get("prev").is_some(), "Operation should have prev reference"); 215} 216#[tokio::test] 217#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_consumes_token -- --ignored --test-threads=1"] 218async fn test_sign_plc_operation_consumes_token() { 219 let client = client(); 220 let (token, did) = create_account_and_login(&client).await; 221 let key_bytes = get_user_signing_key(&did).await 222 .expect("Failed to get user signing key"); 223 let signing_key = SigningKey::from_slice(&key_bytes) 224 .expect("Failed to create signing key"); 225 let handle = get_user_handle(&did).await 226 .expect("Failed to get user handle"); 227 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 228 let pds_endpoint = format!("https://{}", hostname); 229 let request_res = client 230 .post(format!( 231 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 232 base_url().await 233 )) 234 .bearer_auth(&token) 235 .send() 236 .await 237 .expect("Request failed"); 238 assert_eq!(request_res.status(), StatusCode::OK); 239 let plc_token = get_plc_token_from_db(&did).await 240 .expect("PLC token not found in database"); 241 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 242 unsafe { 243 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 244 } 245 let sign_res = client 246 .post(format!( 247 "{}/xrpc/com.atproto.identity.signPlcOperation", 248 base_url().await 249 )) 250 .bearer_auth(&token) 251 .json(&json!({ 252 "token": plc_token 253 })) 254 .send() 255 .await 256 .expect("Sign request failed"); 257 assert_eq!(sign_res.status(), StatusCode::OK); 258 let sign_res_2 = client 259 .post(format!( 260 "{}/xrpc/com.atproto.identity.signPlcOperation", 261 base_url().await 262 )) 263 .bearer_auth(&token) 264 .json(&json!({ 265 "token": plc_token 266 })) 267 .send() 268 .await 269 .expect("Second sign request failed"); 270 assert_eq!( 271 sign_res_2.status(), 272 StatusCode::BAD_REQUEST, 273 "Using the same token twice should fail" 274 ); 275 let body: Value = sign_res_2.json().await.unwrap(); 276 assert!( 277 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken", 278 "Error should indicate invalid/expired token" 279 ); 280} 281#[tokio::test] 282async fn test_sign_plc_operation_with_custom_fields() { 283 let client = client(); 284 let (token, did) = create_account_and_login(&client).await; 285 let key_bytes = get_user_signing_key(&did).await 286 .expect("Failed to get user signing key"); 287 let signing_key = SigningKey::from_slice(&key_bytes) 288 .expect("Failed to create signing key"); 289 let handle = get_user_handle(&did).await 290 .expect("Failed to get user handle"); 291 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 292 let pds_endpoint = format!("https://{}", hostname); 293 let request_res = client 294 .post(format!( 295 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 296 base_url().await 297 )) 298 .bearer_auth(&token) 299 .send() 300 .await 301 .expect("Request failed"); 302 assert_eq!(request_res.status(), StatusCode::OK); 303 let plc_token = get_plc_token_from_db(&did).await 304 .expect("PLC token not found in database"); 305 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await; 306 unsafe { 307 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 308 } 309 let did_key = signing_key_to_did_key(&signing_key); 310 let sign_res = client 311 .post(format!( 312 "{}/xrpc/com.atproto.identity.signPlcOperation", 313 base_url().await 314 )) 315 .bearer_auth(&token) 316 .json(&json!({ 317 "token": plc_token, 318 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"], 319 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"] 320 })) 321 .send() 322 .await 323 .expect("Sign request failed"); 324 let sign_status = sign_res.status(); 325 let sign_body: Value = sign_res.json().await.unwrap_or(json!({})); 326 assert_eq!( 327 sign_status, 328 StatusCode::OK, 329 "Sign with custom fields should succeed. Response: {:?}", 330 sign_body 331 ); 332 let operation = sign_body.get("operation").expect("Should have operation"); 333 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array()); 334 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array()); 335 assert!(also_known_as.is_some(), "Should have alsoKnownAs"); 336 assert!(rotation_keys.is_some(), "Should have rotationKeys"); 337 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases"); 338 assert_eq!(rotation_keys.unwrap().len(), 2, "Should have 2 rotation keys"); 339} 340#[tokio::test] 341#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"] 342async fn test_submit_plc_operation_success() { 343 let client = client(); 344 let (token, did) = create_account_and_login(&client).await; 345 let key_bytes = get_user_signing_key(&did).await 346 .expect("Failed to get user signing key"); 347 let signing_key = SigningKey::from_slice(&key_bytes) 348 .expect("Failed to create signing key"); 349 let handle = get_user_handle(&did).await 350 .expect("Failed to get user handle"); 351 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 352 let pds_endpoint = format!("https://{}", hostname); 353 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 354 unsafe { 355 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 356 } 357 let did_key = signing_key_to_did_key(&signing_key); 358 let operation = json!({ 359 "type": "plc_operation", 360 "rotationKeys": [did_key.clone()], 361 "verificationMethods": { 362 "atproto": did_key.clone() 363 }, 364 "alsoKnownAs": [format!("at://{}", handle)], 365 "services": { 366 "atproto_pds": { 367 "type": "AtprotoPersonalDataServer", 368 "endpoint": pds_endpoint 369 } 370 }, 371 "prev": "bafyreiabc123", 372 "sig": "test_signature_base64" 373 }); 374 let submit_res = client 375 .post(format!( 376 "{}/xrpc/com.atproto.identity.submitPlcOperation", 377 base_url().await 378 )) 379 .bearer_auth(&token) 380 .json(&json!({ "operation": operation })) 381 .send() 382 .await 383 .expect("Submit request failed"); 384 let submit_status = submit_res.status(); 385 let submit_body: Value = submit_res.json().await.unwrap_or(json!({})); 386 assert_eq!( 387 submit_status, 388 StatusCode::OK, 389 "Submit PLC operation should succeed. Response: {:?}", 390 submit_body 391 ); 392} 393#[tokio::test] 394async fn test_submit_plc_operation_wrong_endpoint_rejected() { 395 let client = client(); 396 let (token, did) = create_account_and_login(&client).await; 397 let key_bytes = get_user_signing_key(&did).await 398 .expect("Failed to get user signing key"); 399 let signing_key = SigningKey::from_slice(&key_bytes) 400 .expect("Failed to create signing key"); 401 let handle = get_user_handle(&did).await 402 .expect("Failed to get user handle"); 403 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 404 let pds_endpoint = format!("https://{}", hostname); 405 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 406 unsafe { 407 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 408 } 409 let did_key = signing_key_to_did_key(&signing_key); 410 let operation = json!({ 411 "type": "plc_operation", 412 "rotationKeys": [did_key.clone()], 413 "verificationMethods": { 414 "atproto": did_key.clone() 415 }, 416 "alsoKnownAs": [format!("at://{}", handle)], 417 "services": { 418 "atproto_pds": { 419 "type": "AtprotoPersonalDataServer", 420 "endpoint": "https://wrong-pds.example.com" 421 } 422 }, 423 "prev": "bafyreiabc123", 424 "sig": "test_signature_base64" 425 }); 426 let submit_res = client 427 .post(format!( 428 "{}/xrpc/com.atproto.identity.submitPlcOperation", 429 base_url().await 430 )) 431 .bearer_auth(&token) 432 .json(&json!({ "operation": operation })) 433 .send() 434 .await 435 .expect("Submit request failed"); 436 assert_eq!( 437 submit_res.status(), 438 StatusCode::BAD_REQUEST, 439 "Submit with wrong endpoint should fail" 440 ); 441 let body: Value = submit_res.json().await.unwrap(); 442 assert_eq!(body["error"], "InvalidRequest"); 443} 444#[tokio::test] 445async fn test_submit_plc_operation_wrong_signing_key_rejected() { 446 let client = client(); 447 let (token, did) = create_account_and_login(&client).await; 448 let key_bytes = get_user_signing_key(&did).await 449 .expect("Failed to get user signing key"); 450 let signing_key = SigningKey::from_slice(&key_bytes) 451 .expect("Failed to create signing key"); 452 let handle = get_user_handle(&did).await 453 .expect("Failed to get user handle"); 454 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 455 let pds_endpoint = format!("https://{}", hostname); 456 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await; 457 unsafe { 458 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri()); 459 } 460 let wrong_key = SigningKey::random(&mut rand::thread_rng()); 461 let wrong_did_key = signing_key_to_did_key(&wrong_key); 462 let correct_did_key = signing_key_to_did_key(&signing_key); 463 let operation = json!({ 464 "type": "plc_operation", 465 "rotationKeys": [correct_did_key.clone()], 466 "verificationMethods": { 467 "atproto": wrong_did_key 468 }, 469 "alsoKnownAs": [format!("at://{}", handle)], 470 "services": { 471 "atproto_pds": { 472 "type": "AtprotoPersonalDataServer", 473 "endpoint": pds_endpoint 474 } 475 }, 476 "prev": "bafyreiabc123", 477 "sig": "test_signature_base64" 478 }); 479 let submit_res = client 480 .post(format!( 481 "{}/xrpc/com.atproto.identity.submitPlcOperation", 482 base_url().await 483 )) 484 .bearer_auth(&token) 485 .json(&json!({ "operation": operation })) 486 .send() 487 .await 488 .expect("Submit request failed"); 489 assert_eq!( 490 submit_res.status(), 491 StatusCode::BAD_REQUEST, 492 "Submit with wrong signing key should fail" 493 ); 494 let body: Value = submit_res.json().await.unwrap(); 495 assert_eq!(body["error"], "InvalidRequest"); 496} 497#[tokio::test] 498async fn test_full_sign_and_submit_flow() { 499 let client = client(); 500 let (token, did) = create_account_and_login(&client).await; 501 let key_bytes = get_user_signing_key(&did).await 502 .expect("Failed to get user signing key"); 503 let signing_key = SigningKey::from_slice(&key_bytes) 504 .expect("Failed to create signing key"); 505 let handle = get_user_handle(&did).await 506 .expect("Failed to get user handle"); 507 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 508 let pds_endpoint = format!("https://{}", hostname); 509 let request_res = client 510 .post(format!( 511 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 512 base_url().await 513 )) 514 .bearer_auth(&token) 515 .send() 516 .await 517 .expect("Request failed"); 518 assert_eq!(request_res.status(), StatusCode::OK); 519 let plc_token = get_plc_token_from_db(&did).await 520 .expect("PLC token not found"); 521 let mock_server = MockServer::start().await; 522 let did_encoded = urlencoding::encode(&did); 523 let did_key = signing_key_to_did_key(&signing_key); 524 let last_op = json!({ 525 "type": "plc_operation", 526 "rotationKeys": [did_key.clone()], 527 "verificationMethods": { 528 "atproto": did_key.clone() 529 }, 530 "alsoKnownAs": [format!("at://{}", handle)], 531 "services": { 532 "atproto_pds": { 533 "type": "AtprotoPersonalDataServer", 534 "endpoint": pds_endpoint.clone() 535 } 536 }, 537 "prev": null, 538 "sig": "initial_sig" 539 }); 540 Mock::given(method("GET")) 541 .and(path(format!("/{}/log/last", did_encoded))) 542 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 543 .mount(&mock_server) 544 .await; 545 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 546 Mock::given(method("GET")) 547 .and(path(format!("/{}", did_encoded))) 548 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 549 .mount(&mock_server) 550 .await; 551 Mock::given(method("POST")) 552 .and(path(format!("/{}", did_encoded))) 553 .respond_with(ResponseTemplate::new(200)) 554 .expect(1) 555 .mount(&mock_server) 556 .await; 557 unsafe { 558 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 559 } 560 let sign_res = client 561 .post(format!( 562 "{}/xrpc/com.atproto.identity.signPlcOperation", 563 base_url().await 564 )) 565 .bearer_auth(&token) 566 .json(&json!({ "token": plc_token })) 567 .send() 568 .await 569 .expect("Sign failed"); 570 assert_eq!(sign_res.status(), StatusCode::OK); 571 let sign_body: Value = sign_res.json().await.unwrap(); 572 let signed_operation = sign_body.get("operation") 573 .expect("Response should contain operation") 574 .clone(); 575 assert!(signed_operation.get("sig").is_some()); 576 assert!(signed_operation.get("prev").is_some()); 577 let submit_res = client 578 .post(format!( 579 "{}/xrpc/com.atproto.identity.submitPlcOperation", 580 base_url().await 581 )) 582 .bearer_auth(&token) 583 .json(&json!({ "operation": signed_operation })) 584 .send() 585 .await 586 .expect("Submit failed"); 587 let submit_status = submit_res.status(); 588 let submit_body: Value = submit_res.json().await.unwrap_or(json!({})); 589 assert_eq!( 590 submit_status, 591 StatusCode::OK, 592 "Full sign and submit flow should succeed. Response: {:?}", 593 submit_body 594 ); 595} 596#[tokio::test] 597async fn test_cross_pds_migration_with_records() { 598 let client = client(); 599 let (token, did) = create_account_and_login(&client).await; 600 let key_bytes = get_user_signing_key(&did).await 601 .expect("Failed to get user signing key"); 602 let signing_key = SigningKey::from_slice(&key_bytes) 603 .expect("Failed to create signing key"); 604 let handle = get_user_handle(&did).await 605 .expect("Failed to get user handle"); 606 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 607 let pds_endpoint = format!("https://{}", hostname); 608 let post_payload = json!({ 609 "repo": did, 610 "collection": "app.bsky.feed.post", 611 "record": { 612 "$type": "app.bsky.feed.post", 613 "text": "Test post before migration", 614 "createdAt": chrono::Utc::now().to_rfc3339(), 615 } 616 }); 617 let create_res = client 618 .post(format!( 619 "{}/xrpc/com.atproto.repo.createRecord", 620 base_url().await 621 )) 622 .bearer_auth(&token) 623 .json(&post_payload) 624 .send() 625 .await 626 .expect("Failed to create post"); 627 assert_eq!(create_res.status(), StatusCode::OK); 628 let create_body: Value = create_res.json().await.unwrap(); 629 let original_uri = create_body["uri"].as_str().unwrap().to_string(); 630 let export_res = client 631 .get(format!( 632 "{}/xrpc/com.atproto.sync.getRepo?did={}", 633 base_url().await, 634 did 635 )) 636 .send() 637 .await 638 .expect("Export failed"); 639 assert_eq!(export_res.status(), StatusCode::OK); 640 let car_bytes = export_res.bytes().await.unwrap(); 641 assert!(car_bytes.len() > 100, "CAR file should have meaningful content"); 642 let mock_server = MockServer::start().await; 643 let did_encoded = urlencoding::encode(&did); 644 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 645 Mock::given(method("GET")) 646 .and(path(format!("/{}", did_encoded))) 647 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 648 .mount(&mock_server) 649 .await; 650 unsafe { 651 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 652 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 653 } 654 let import_res = client 655 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 656 .bearer_auth(&token) 657 .header("Content-Type", "application/vnd.ipld.car") 658 .body(car_bytes.to_vec()) 659 .send() 660 .await 661 .expect("Import failed"); 662 let import_status = import_res.status(); 663 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 664 unsafe { 665 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 666 } 667 assert_eq!( 668 import_status, 669 StatusCode::OK, 670 "Import with valid DID document should succeed. Response: {:?}", 671 import_body 672 ); 673 let get_record_res = client 674 .get(format!( 675 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}", 676 base_url().await, 677 did, 678 original_uri.split('/').last().unwrap() 679 )) 680 .send() 681 .await 682 .expect("Get record failed"); 683 assert_eq!( 684 get_record_res.status(), 685 StatusCode::OK, 686 "Record should be retrievable after import" 687 ); 688 let record_body: Value = get_record_res.json().await.unwrap(); 689 assert_eq!( 690 record_body["value"]["text"], 691 "Test post before migration", 692 "Record content should match" 693 ); 694} 695#[tokio::test] 696async fn test_migration_rejects_wrong_did_document() { 697 let client = client(); 698 let (token, did) = create_account_and_login(&client).await; 699 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng()); 700 let handle = get_user_handle(&did).await 701 .expect("Failed to get user handle"); 702 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 703 let pds_endpoint = format!("https://{}", hostname); 704 let export_res = client 705 .get(format!( 706 "{}/xrpc/com.atproto.sync.getRepo?did={}", 707 base_url().await, 708 did 709 )) 710 .send() 711 .await 712 .expect("Export failed"); 713 assert_eq!(export_res.status(), StatusCode::OK); 714 let car_bytes = export_res.bytes().await.unwrap(); 715 let mock_server = MockServer::start().await; 716 let did_encoded = urlencoding::encode(&did); 717 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint); 718 Mock::given(method("GET")) 719 .and(path(format!("/{}", did_encoded))) 720 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc)) 721 .mount(&mock_server) 722 .await; 723 unsafe { 724 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 725 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 726 } 727 let import_res = client 728 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 729 .bearer_auth(&token) 730 .header("Content-Type", "application/vnd.ipld.car") 731 .body(car_bytes.to_vec()) 732 .send() 733 .await 734 .expect("Import failed"); 735 let import_status = import_res.status(); 736 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 737 unsafe { 738 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 739 } 740 assert_eq!( 741 import_status, 742 StatusCode::BAD_REQUEST, 743 "Import with wrong DID document should fail. Response: {:?}", 744 import_body 745 ); 746 assert!( 747 import_body["error"] == "InvalidSignature" || 748 import_body["message"].as_str().unwrap_or("").contains("signature"), 749 "Error should mention signature verification failure" 750 ); 751} 752#[tokio::test] 753#[ignore = "requires exclusive env var access; run with: cargo test test_full_migration_flow_end_to_end -- --ignored --test-threads=1"] 754async fn test_full_migration_flow_end_to_end() { 755 let client = client(); 756 let (token, did) = create_account_and_login(&client).await; 757 let key_bytes = get_user_signing_key(&did).await 758 .expect("Failed to get user signing key"); 759 let signing_key = SigningKey::from_slice(&key_bytes) 760 .expect("Failed to create signing key"); 761 let handle = get_user_handle(&did).await 762 .expect("Failed to get user handle"); 763 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 764 let pds_endpoint = format!("https://{}", hostname); 765 let did_key = signing_key_to_did_key(&signing_key); 766 for i in 0..3 { 767 let post_payload = json!({ 768 "repo": did, 769 "collection": "app.bsky.feed.post", 770 "record": { 771 "$type": "app.bsky.feed.post", 772 "text": format!("Pre-migration post #{}", i), 773 "createdAt": chrono::Utc::now().to_rfc3339(), 774 } 775 }); 776 let res = client 777 .post(format!( 778 "{}/xrpc/com.atproto.repo.createRecord", 779 base_url().await 780 )) 781 .bearer_auth(&token) 782 .json(&post_payload) 783 .send() 784 .await 785 .expect("Failed to create post"); 786 assert_eq!(res.status(), StatusCode::OK); 787 } 788 let request_res = client 789 .post(format!( 790 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature", 791 base_url().await 792 )) 793 .bearer_auth(&token) 794 .send() 795 .await 796 .expect("Request failed"); 797 assert_eq!(request_res.status(), StatusCode::OK); 798 let plc_token = get_plc_token_from_db(&did).await 799 .expect("PLC token not found"); 800 let mock_server = MockServer::start().await; 801 let did_encoded = urlencoding::encode(&did); 802 let last_op = json!({ 803 "type": "plc_operation", 804 "rotationKeys": [did_key.clone()], 805 "verificationMethods": { "atproto": did_key.clone() }, 806 "alsoKnownAs": [format!("at://{}", handle)], 807 "services": { 808 "atproto_pds": { 809 "type": "AtprotoPersonalDataServer", 810 "endpoint": pds_endpoint.clone() 811 } 812 }, 813 "prev": null, 814 "sig": "initial_sig" 815 }); 816 Mock::given(method("GET")) 817 .and(path(format!("/{}/log/last", did_encoded))) 818 .respond_with(ResponseTemplate::new(200).set_body_json(last_op)) 819 .mount(&mock_server) 820 .await; 821 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint); 822 Mock::given(method("GET")) 823 .and(path(format!("/{}", did_encoded))) 824 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) 825 .mount(&mock_server) 826 .await; 827 Mock::given(method("POST")) 828 .and(path(format!("/{}", did_encoded))) 829 .respond_with(ResponseTemplate::new(200)) 830 .expect(1) 831 .mount(&mock_server) 832 .await; 833 unsafe { 834 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri()); 835 } 836 let sign_res = client 837 .post(format!( 838 "{}/xrpc/com.atproto.identity.signPlcOperation", 839 base_url().await 840 )) 841 .bearer_auth(&token) 842 .json(&json!({ "token": plc_token })) 843 .send() 844 .await 845 .expect("Sign failed"); 846 assert_eq!(sign_res.status(), StatusCode::OK); 847 let sign_body: Value = sign_res.json().await.unwrap(); 848 let signed_op = sign_body.get("operation").unwrap().clone(); 849 let export_res = client 850 .get(format!( 851 "{}/xrpc/com.atproto.sync.getRepo?did={}", 852 base_url().await, 853 did 854 )) 855 .send() 856 .await 857 .expect("Export failed"); 858 assert_eq!(export_res.status(), StatusCode::OK); 859 let car_bytes = export_res.bytes().await.unwrap(); 860 let submit_res = client 861 .post(format!( 862 "{}/xrpc/com.atproto.identity.submitPlcOperation", 863 base_url().await 864 )) 865 .bearer_auth(&token) 866 .json(&json!({ "operation": signed_op })) 867 .send() 868 .await 869 .expect("Submit failed"); 870 assert_eq!(submit_res.status(), StatusCode::OK); 871 unsafe { 872 std::env::remove_var("SKIP_IMPORT_VERIFICATION"); 873 } 874 let import_res = client 875 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await)) 876 .bearer_auth(&token) 877 .header("Content-Type", "application/vnd.ipld.car") 878 .body(car_bytes.to_vec()) 879 .send() 880 .await 881 .expect("Import failed"); 882 let import_status = import_res.status(); 883 let import_body: Value = import_res.json().await.unwrap_or(json!({})); 884 unsafe { 885 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 886 } 887 assert_eq!( 888 import_status, 889 StatusCode::OK, 890 "Full migration flow should succeed. Response: {:?}", 891 import_body 892 ); 893 let list_res = client 894 .get(format!( 895 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post", 896 base_url().await, 897 did 898 )) 899 .send() 900 .await 901 .expect("List failed"); 902 assert_eq!(list_res.status(), StatusCode::OK); 903 let list_body: Value = list_res.json().await.unwrap(); 904 let records = list_body["records"].as_array() 905 .expect("Should have records array"); 906 assert!( 907 records.len() >= 1, 908 "Should have at least 1 record after migration, found {}", 909 records.len() 910 ); 911}