this repo has no description
1mod common;
2use common::*;
3use k256::ecdsa::SigningKey;
4use reqwest::StatusCode;
5use serde_json::{json, Value};
6use sqlx::PgPool;
7use wiremock::matchers::{method, path};
8use wiremock::{Mock, MockServer, ResponseTemplate};
9
10fn encode_uvarint(mut x: u64) -> Vec<u8> {
11 let mut out = Vec::new();
12 while x >= 0x80 {
13 out.push(((x as u8) & 0x7F) | 0x80);
14 x >>= 7;
15 }
16 out.push(x as u8);
17 out
18}
19
20fn signing_key_to_did_key(signing_key: &SigningKey) -> String {
21 let verifying_key = signing_key.verifying_key();
22 let point = verifying_key.to_encoded_point(true);
23 let compressed_bytes = point.as_bytes();
24 let mut prefixed = vec![0xe7, 0x01];
25 prefixed.extend_from_slice(compressed_bytes);
26 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed);
27 format!("did:key:{}", encoded)
28}
29
30fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String {
31 let public_key = signing_key.verifying_key();
32 let compressed = public_key.to_sec1_bytes();
33 let mut buf = encode_uvarint(0xE7);
34 buf.extend_from_slice(&compressed);
35 multibase::encode(multibase::Base::Base58Btc, buf)
36}
37
38async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> {
39 let db_url = get_db_connection_string().await;
40 let pool = PgPool::connect(&db_url).await.ok()?;
41 let row = sqlx::query!(
42 r#"
43 SELECT k.key_bytes, k.encryption_version
44 FROM user_keys k
45 JOIN users u ON k.user_id = u.id
46 WHERE u.did = $1
47 "#,
48 did
49 )
50 .fetch_optional(&pool)
51 .await
52 .ok()??;
53 bspds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok()
54}
55
56async fn get_plc_token_from_db(did: &str) -> Option<String> {
57 let db_url = get_db_connection_string().await;
58 let pool = PgPool::connect(&db_url).await.ok()?;
59 sqlx::query_scalar!(
60 r#"
61 SELECT t.token
62 FROM plc_operation_tokens t
63 JOIN users u ON t.user_id = u.id
64 WHERE u.did = $1
65 "#,
66 did
67 )
68 .fetch_optional(&pool)
69 .await
70 .ok()?
71}
72
73async fn get_user_handle(did: &str) -> Option<String> {
74 let db_url = get_db_connection_string().await;
75 let pool = PgPool::connect(&db_url).await.ok()?;
76 sqlx::query_scalar!(
77 r#"SELECT handle FROM users WHERE did = $1"#,
78 did
79 )
80 .fetch_optional(&pool)
81 .await
82 .ok()?
83}
84
85fn create_mock_last_op(
86 _did: &str,
87 handle: &str,
88 signing_key: &SigningKey,
89 pds_endpoint: &str,
90) -> Value {
91 let did_key = signing_key_to_did_key(signing_key);
92 json!({
93 "type": "plc_operation",
94 "rotationKeys": [did_key.clone()],
95 "verificationMethods": {
96 "atproto": did_key
97 },
98 "alsoKnownAs": [format!("at://{}", handle)],
99 "services": {
100 "atproto_pds": {
101 "type": "AtprotoPersonalDataServer",
102 "endpoint": pds_endpoint
103 }
104 },
105 "prev": null,
106 "sig": "mock_signature_for_testing"
107 })
108}
109
110fn create_did_document(did: &str, handle: &str, signing_key: &SigningKey, pds_endpoint: &str) -> Value {
111 let multikey = get_multikey_from_signing_key(signing_key);
112 json!({
113 "@context": [
114 "https://www.w3.org/ns/did/v1",
115 "https://w3id.org/security/multikey/v1"
116 ],
117 "id": did,
118 "alsoKnownAs": [format!("at://{}", handle)],
119 "verificationMethod": [{
120 "id": format!("{}#atproto", did),
121 "type": "Multikey",
122 "controller": did,
123 "publicKeyMultibase": multikey
124 }],
125 "service": [{
126 "id": "#atproto_pds",
127 "type": "AtprotoPersonalDataServer",
128 "serviceEndpoint": pds_endpoint
129 }]
130 })
131}
132
133async fn setup_mock_plc_for_sign(
134 did: &str,
135 handle: &str,
136 signing_key: &SigningKey,
137 pds_endpoint: &str,
138) -> MockServer {
139 let mock_server = MockServer::start().await;
140 let did_encoded = urlencoding::encode(did);
141 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint);
142 Mock::given(method("GET"))
143 .and(path(format!("/{}/log/last", did_encoded)))
144 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
145 .mount(&mock_server)
146 .await;
147 mock_server
148}
149
150async fn setup_mock_plc_for_submit(
151 did: &str,
152 handle: &str,
153 signing_key: &SigningKey,
154 pds_endpoint: &str,
155) -> MockServer {
156 let mock_server = MockServer::start().await;
157 let did_encoded = urlencoding::encode(did);
158 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint);
159 Mock::given(method("GET"))
160 .and(path(format!("/{}", did_encoded)))
161 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone()))
162 .mount(&mock_server)
163 .await;
164 Mock::given(method("POST"))
165 .and(path(format!("/{}", did_encoded)))
166 .respond_with(ResponseTemplate::new(200))
167 .mount(&mock_server)
168 .await;
169 mock_server
170}
171
172#[tokio::test]
173#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"]
174async fn test_full_plc_operation_flow() {
175 let client = client();
176 let (token, did) = create_account_and_login(&client).await;
177 let key_bytes = get_user_signing_key(&did).await
178 .expect("Failed to get user signing key");
179 let signing_key = SigningKey::from_slice(&key_bytes)
180 .expect("Failed to create signing key");
181 let handle = get_user_handle(&did).await
182 .expect("Failed to get user handle");
183 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
184 let pds_endpoint = format!("https://{}", hostname);
185 let request_res = client
186 .post(format!(
187 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
188 base_url().await
189 ))
190 .bearer_auth(&token)
191 .send()
192 .await
193 .expect("Request failed");
194 assert_eq!(request_res.status(), StatusCode::OK);
195 let plc_token = get_plc_token_from_db(&did).await
196 .expect("PLC token not found in database");
197 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
198 unsafe {
199 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
200 }
201 let sign_res = client
202 .post(format!(
203 "{}/xrpc/com.atproto.identity.signPlcOperation",
204 base_url().await
205 ))
206 .bearer_auth(&token)
207 .json(&json!({
208 "token": plc_token
209 }))
210 .send()
211 .await
212 .expect("Sign request failed");
213 let sign_status = sign_res.status();
214 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
215 assert_eq!(
216 sign_status,
217 StatusCode::OK,
218 "Sign PLC operation should succeed. Response: {:?}",
219 sign_body
220 );
221 let operation = sign_body.get("operation")
222 .expect("Response should contain operation");
223 assert!(operation.get("sig").is_some(), "Operation should be signed");
224 assert_eq!(operation.get("type").and_then(|v| v.as_str()), Some("plc_operation"));
225 assert!(operation.get("prev").is_some(), "Operation should have prev reference");
226}
227
228#[tokio::test]
229#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_consumes_token -- --ignored --test-threads=1"]
230async fn test_sign_plc_operation_consumes_token() {
231 let client = client();
232 let (token, did) = create_account_and_login(&client).await;
233 let key_bytes = get_user_signing_key(&did).await
234 .expect("Failed to get user signing key");
235 let signing_key = SigningKey::from_slice(&key_bytes)
236 .expect("Failed to create signing key");
237 let handle = get_user_handle(&did).await
238 .expect("Failed to get user handle");
239 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
240 let pds_endpoint = format!("https://{}", hostname);
241 let request_res = client
242 .post(format!(
243 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
244 base_url().await
245 ))
246 .bearer_auth(&token)
247 .send()
248 .await
249 .expect("Request failed");
250 assert_eq!(request_res.status(), StatusCode::OK);
251 let plc_token = get_plc_token_from_db(&did).await
252 .expect("PLC token not found in database");
253 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
254 unsafe {
255 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
256 }
257 let sign_res = client
258 .post(format!(
259 "{}/xrpc/com.atproto.identity.signPlcOperation",
260 base_url().await
261 ))
262 .bearer_auth(&token)
263 .json(&json!({
264 "token": plc_token
265 }))
266 .send()
267 .await
268 .expect("Sign request failed");
269 assert_eq!(sign_res.status(), StatusCode::OK);
270 let sign_res_2 = client
271 .post(format!(
272 "{}/xrpc/com.atproto.identity.signPlcOperation",
273 base_url().await
274 ))
275 .bearer_auth(&token)
276 .json(&json!({
277 "token": plc_token
278 }))
279 .send()
280 .await
281 .expect("Second sign request failed");
282 assert_eq!(
283 sign_res_2.status(),
284 StatusCode::BAD_REQUEST,
285 "Using the same token twice should fail"
286 );
287 let body: Value = sign_res_2.json().await.unwrap();
288 assert!(
289 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken",
290 "Error should indicate invalid/expired token"
291 );
292}
293
294#[tokio::test]
295async fn test_sign_plc_operation_with_custom_fields() {
296 let client = client();
297 let (token, did) = create_account_and_login(&client).await;
298 let key_bytes = get_user_signing_key(&did).await
299 .expect("Failed to get user signing key");
300 let signing_key = SigningKey::from_slice(&key_bytes)
301 .expect("Failed to create signing key");
302 let handle = get_user_handle(&did).await
303 .expect("Failed to get user handle");
304 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
305 let pds_endpoint = format!("https://{}", hostname);
306 let request_res = client
307 .post(format!(
308 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
309 base_url().await
310 ))
311 .bearer_auth(&token)
312 .send()
313 .await
314 .expect("Request failed");
315 assert_eq!(request_res.status(), StatusCode::OK);
316 let plc_token = get_plc_token_from_db(&did).await
317 .expect("PLC token not found in database");
318 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
319 unsafe {
320 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
321 }
322 let did_key = signing_key_to_did_key(&signing_key);
323 let sign_res = client
324 .post(format!(
325 "{}/xrpc/com.atproto.identity.signPlcOperation",
326 base_url().await
327 ))
328 .bearer_auth(&token)
329 .json(&json!({
330 "token": plc_token,
331 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"],
332 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"]
333 }))
334 .send()
335 .await
336 .expect("Sign request failed");
337 let sign_status = sign_res.status();
338 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
339 assert_eq!(
340 sign_status,
341 StatusCode::OK,
342 "Sign with custom fields should succeed. Response: {:?}",
343 sign_body
344 );
345 let operation = sign_body.get("operation").expect("Should have operation");
346 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array());
347 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array());
348 assert!(also_known_as.is_some(), "Should have alsoKnownAs");
349 assert!(rotation_keys.is_some(), "Should have rotationKeys");
350 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases");
351 assert_eq!(rotation_keys.unwrap().len(), 2, "Should have 2 rotation keys");
352}
353
354#[tokio::test]
355#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"]
356async fn test_submit_plc_operation_success() {
357 let client = client();
358 let (token, did) = create_account_and_login(&client).await;
359 let key_bytes = get_user_signing_key(&did).await
360 .expect("Failed to get user signing key");
361 let signing_key = SigningKey::from_slice(&key_bytes)
362 .expect("Failed to create signing key");
363 let handle = get_user_handle(&did).await
364 .expect("Failed to get user handle");
365 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
366 let pds_endpoint = format!("https://{}", hostname);
367 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
368 unsafe {
369 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
370 }
371 let did_key = signing_key_to_did_key(&signing_key);
372 let operation = json!({
373 "type": "plc_operation",
374 "rotationKeys": [did_key.clone()],
375 "verificationMethods": {
376 "atproto": did_key.clone()
377 },
378 "alsoKnownAs": [format!("at://{}", handle)],
379 "services": {
380 "atproto_pds": {
381 "type": "AtprotoPersonalDataServer",
382 "endpoint": pds_endpoint
383 }
384 },
385 "prev": "bafyreiabc123",
386 "sig": "test_signature_base64"
387 });
388 let submit_res = client
389 .post(format!(
390 "{}/xrpc/com.atproto.identity.submitPlcOperation",
391 base_url().await
392 ))
393 .bearer_auth(&token)
394 .json(&json!({ "operation": operation }))
395 .send()
396 .await
397 .expect("Submit request failed");
398 let submit_status = submit_res.status();
399 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
400 assert_eq!(
401 submit_status,
402 StatusCode::OK,
403 "Submit PLC operation should succeed. Response: {:?}",
404 submit_body
405 );
406}
407
408#[tokio::test]
409async fn test_submit_plc_operation_wrong_endpoint_rejected() {
410 let client = client();
411 let (token, did) = create_account_and_login(&client).await;
412 let key_bytes = get_user_signing_key(&did).await
413 .expect("Failed to get user signing key");
414 let signing_key = SigningKey::from_slice(&key_bytes)
415 .expect("Failed to create signing key");
416 let handle = get_user_handle(&did).await
417 .expect("Failed to get user handle");
418 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
419 let pds_endpoint = format!("https://{}", hostname);
420 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
421 unsafe {
422 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
423 }
424 let did_key = signing_key_to_did_key(&signing_key);
425 let operation = json!({
426 "type": "plc_operation",
427 "rotationKeys": [did_key.clone()],
428 "verificationMethods": {
429 "atproto": did_key.clone()
430 },
431 "alsoKnownAs": [format!("at://{}", handle)],
432 "services": {
433 "atproto_pds": {
434 "type": "AtprotoPersonalDataServer",
435 "endpoint": "https://wrong-pds.example.com"
436 }
437 },
438 "prev": "bafyreiabc123",
439 "sig": "test_signature_base64"
440 });
441 let submit_res = client
442 .post(format!(
443 "{}/xrpc/com.atproto.identity.submitPlcOperation",
444 base_url().await
445 ))
446 .bearer_auth(&token)
447 .json(&json!({ "operation": operation }))
448 .send()
449 .await
450 .expect("Submit request failed");
451 assert_eq!(
452 submit_res.status(),
453 StatusCode::BAD_REQUEST,
454 "Submit with wrong endpoint should fail"
455 );
456 let body: Value = submit_res.json().await.unwrap();
457 assert_eq!(body["error"], "InvalidRequest");
458}
459
460#[tokio::test]
461async fn test_submit_plc_operation_wrong_signing_key_rejected() {
462 let client = client();
463 let (token, did) = create_account_and_login(&client).await;
464 let key_bytes = get_user_signing_key(&did).await
465 .expect("Failed to get user signing key");
466 let signing_key = SigningKey::from_slice(&key_bytes)
467 .expect("Failed to create signing key");
468 let handle = get_user_handle(&did).await
469 .expect("Failed to get user handle");
470 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
471 let pds_endpoint = format!("https://{}", hostname);
472 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
473 unsafe {
474 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
475 }
476 let wrong_key = SigningKey::random(&mut rand::thread_rng());
477 let wrong_did_key = signing_key_to_did_key(&wrong_key);
478 let correct_did_key = signing_key_to_did_key(&signing_key);
479 let operation = json!({
480 "type": "plc_operation",
481 "rotationKeys": [correct_did_key.clone()],
482 "verificationMethods": {
483 "atproto": wrong_did_key
484 },
485 "alsoKnownAs": [format!("at://{}", handle)],
486 "services": {
487 "atproto_pds": {
488 "type": "AtprotoPersonalDataServer",
489 "endpoint": pds_endpoint
490 }
491 },
492 "prev": "bafyreiabc123",
493 "sig": "test_signature_base64"
494 });
495 let submit_res = client
496 .post(format!(
497 "{}/xrpc/com.atproto.identity.submitPlcOperation",
498 base_url().await
499 ))
500 .bearer_auth(&token)
501 .json(&json!({ "operation": operation }))
502 .send()
503 .await
504 .expect("Submit request failed");
505 assert_eq!(
506 submit_res.status(),
507 StatusCode::BAD_REQUEST,
508 "Submit with wrong signing key should fail"
509 );
510 let body: Value = submit_res.json().await.unwrap();
511 assert_eq!(body["error"], "InvalidRequest");
512}
513
514#[tokio::test]
515async fn test_full_sign_and_submit_flow() {
516 let client = client();
517 let (token, did) = create_account_and_login(&client).await;
518 let key_bytes = get_user_signing_key(&did).await
519 .expect("Failed to get user signing key");
520 let signing_key = SigningKey::from_slice(&key_bytes)
521 .expect("Failed to create signing key");
522 let handle = get_user_handle(&did).await
523 .expect("Failed to get user handle");
524 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
525 let pds_endpoint = format!("https://{}", hostname);
526 let request_res = client
527 .post(format!(
528 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
529 base_url().await
530 ))
531 .bearer_auth(&token)
532 .send()
533 .await
534 .expect("Request failed");
535 assert_eq!(request_res.status(), StatusCode::OK);
536 let plc_token = get_plc_token_from_db(&did).await
537 .expect("PLC token not found");
538 let mock_server = MockServer::start().await;
539 let did_encoded = urlencoding::encode(&did);
540 let did_key = signing_key_to_did_key(&signing_key);
541 let last_op = json!({
542 "type": "plc_operation",
543 "rotationKeys": [did_key.clone()],
544 "verificationMethods": {
545 "atproto": did_key.clone()
546 },
547 "alsoKnownAs": [format!("at://{}", handle)],
548 "services": {
549 "atproto_pds": {
550 "type": "AtprotoPersonalDataServer",
551 "endpoint": pds_endpoint.clone()
552 }
553 },
554 "prev": null,
555 "sig": "initial_sig"
556 });
557 Mock::given(method("GET"))
558 .and(path(format!("/{}/log/last", did_encoded)))
559 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
560 .mount(&mock_server)
561 .await;
562 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
563 Mock::given(method("GET"))
564 .and(path(format!("/{}", did_encoded)))
565 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
566 .mount(&mock_server)
567 .await;
568 Mock::given(method("POST"))
569 .and(path(format!("/{}", did_encoded)))
570 .respond_with(ResponseTemplate::new(200))
571 .expect(1)
572 .mount(&mock_server)
573 .await;
574 unsafe {
575 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
576 }
577 let sign_res = client
578 .post(format!(
579 "{}/xrpc/com.atproto.identity.signPlcOperation",
580 base_url().await
581 ))
582 .bearer_auth(&token)
583 .json(&json!({ "token": plc_token }))
584 .send()
585 .await
586 .expect("Sign failed");
587 assert_eq!(sign_res.status(), StatusCode::OK);
588 let sign_body: Value = sign_res.json().await.unwrap();
589 let signed_operation = sign_body.get("operation")
590 .expect("Response should contain operation")
591 .clone();
592 assert!(signed_operation.get("sig").is_some());
593 assert!(signed_operation.get("prev").is_some());
594 let submit_res = client
595 .post(format!(
596 "{}/xrpc/com.atproto.identity.submitPlcOperation",
597 base_url().await
598 ))
599 .bearer_auth(&token)
600 .json(&json!({ "operation": signed_operation }))
601 .send()
602 .await
603 .expect("Submit failed");
604 let submit_status = submit_res.status();
605 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
606 assert_eq!(
607 submit_status,
608 StatusCode::OK,
609 "Full sign and submit flow should succeed. Response: {:?}",
610 submit_body
611 );
612}
613
614#[tokio::test]
615async fn test_cross_pds_migration_with_records() {
616 let client = client();
617 let (token, did) = create_account_and_login(&client).await;
618 let key_bytes = get_user_signing_key(&did).await
619 .expect("Failed to get user signing key");
620 let signing_key = SigningKey::from_slice(&key_bytes)
621 .expect("Failed to create signing key");
622 let handle = get_user_handle(&did).await
623 .expect("Failed to get user handle");
624 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
625 let pds_endpoint = format!("https://{}", hostname);
626 let post_payload = json!({
627 "repo": did,
628 "collection": "app.bsky.feed.post",
629 "record": {
630 "$type": "app.bsky.feed.post",
631 "text": "Test post before migration",
632 "createdAt": chrono::Utc::now().to_rfc3339(),
633 }
634 });
635 let create_res = client
636 .post(format!(
637 "{}/xrpc/com.atproto.repo.createRecord",
638 base_url().await
639 ))
640 .bearer_auth(&token)
641 .json(&post_payload)
642 .send()
643 .await
644 .expect("Failed to create post");
645 assert_eq!(create_res.status(), StatusCode::OK);
646 let create_body: Value = create_res.json().await.unwrap();
647 let original_uri = create_body["uri"].as_str().unwrap().to_string();
648 let export_res = client
649 .get(format!(
650 "{}/xrpc/com.atproto.sync.getRepo?did={}",
651 base_url().await,
652 did
653 ))
654 .send()
655 .await
656 .expect("Export failed");
657 assert_eq!(export_res.status(), StatusCode::OK);
658 let car_bytes = export_res.bytes().await.unwrap();
659 assert!(car_bytes.len() > 100, "CAR file should have meaningful content");
660 let mock_server = MockServer::start().await;
661 let did_encoded = urlencoding::encode(&did);
662 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
663 Mock::given(method("GET"))
664 .and(path(format!("/{}", did_encoded)))
665 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
666 .mount(&mock_server)
667 .await;
668 unsafe {
669 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
670 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
671 }
672 let import_res = client
673 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
674 .bearer_auth(&token)
675 .header("Content-Type", "application/vnd.ipld.car")
676 .body(car_bytes.to_vec())
677 .send()
678 .await
679 .expect("Import failed");
680 let import_status = import_res.status();
681 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
682 unsafe {
683 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
684 }
685 assert_eq!(
686 import_status,
687 StatusCode::OK,
688 "Import with valid DID document should succeed. Response: {:?}",
689 import_body
690 );
691 let get_record_res = client
692 .get(format!(
693 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}",
694 base_url().await,
695 did,
696 original_uri.split('/').last().unwrap()
697 ))
698 .send()
699 .await
700 .expect("Get record failed");
701 assert_eq!(
702 get_record_res.status(),
703 StatusCode::OK,
704 "Record should be retrievable after import"
705 );
706 let record_body: Value = get_record_res.json().await.unwrap();
707 assert_eq!(
708 record_body["value"]["text"],
709 "Test post before migration",
710 "Record content should match"
711 );
712}
713
714#[tokio::test]
715async fn test_migration_rejects_wrong_did_document() {
716 let client = client();
717 let (token, did) = create_account_and_login(&client).await;
718 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng());
719 let handle = get_user_handle(&did).await
720 .expect("Failed to get user handle");
721 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
722 let pds_endpoint = format!("https://{}", hostname);
723 let export_res = client
724 .get(format!(
725 "{}/xrpc/com.atproto.sync.getRepo?did={}",
726 base_url().await,
727 did
728 ))
729 .send()
730 .await
731 .expect("Export failed");
732 assert_eq!(export_res.status(), StatusCode::OK);
733 let car_bytes = export_res.bytes().await.unwrap();
734 let mock_server = MockServer::start().await;
735 let did_encoded = urlencoding::encode(&did);
736 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint);
737 Mock::given(method("GET"))
738 .and(path(format!("/{}", did_encoded)))
739 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc))
740 .mount(&mock_server)
741 .await;
742 unsafe {
743 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
744 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
745 }
746 let import_res = client
747 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
748 .bearer_auth(&token)
749 .header("Content-Type", "application/vnd.ipld.car")
750 .body(car_bytes.to_vec())
751 .send()
752 .await
753 .expect("Import failed");
754 let import_status = import_res.status();
755 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
756 unsafe {
757 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
758 }
759 assert_eq!(
760 import_status,
761 StatusCode::BAD_REQUEST,
762 "Import with wrong DID document should fail. Response: {:?}",
763 import_body
764 );
765 assert!(
766 import_body["error"] == "InvalidSignature" ||
767 import_body["message"].as_str().unwrap_or("").contains("signature"),
768 "Error should mention signature verification failure"
769 );
770}
771
772#[tokio::test]
773#[ignore = "requires exclusive env var access; run with: cargo test test_full_migration_flow_end_to_end -- --ignored --test-threads=1"]
774async fn test_full_migration_flow_end_to_end() {
775 let client = client();
776 let (token, did) = create_account_and_login(&client).await;
777 let key_bytes = get_user_signing_key(&did).await
778 .expect("Failed to get user signing key");
779 let signing_key = SigningKey::from_slice(&key_bytes)
780 .expect("Failed to create signing key");
781 let handle = get_user_handle(&did).await
782 .expect("Failed to get user handle");
783 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
784 let pds_endpoint = format!("https://{}", hostname);
785 let did_key = signing_key_to_did_key(&signing_key);
786 for i in 0..3 {
787 let post_payload = json!({
788 "repo": did,
789 "collection": "app.bsky.feed.post",
790 "record": {
791 "$type": "app.bsky.feed.post",
792 "text": format!("Pre-migration post #{}", i),
793 "createdAt": chrono::Utc::now().to_rfc3339(),
794 }
795 });
796 let res = client
797 .post(format!(
798 "{}/xrpc/com.atproto.repo.createRecord",
799 base_url().await
800 ))
801 .bearer_auth(&token)
802 .json(&post_payload)
803 .send()
804 .await
805 .expect("Failed to create post");
806 assert_eq!(res.status(), StatusCode::OK);
807 }
808 let request_res = client
809 .post(format!(
810 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
811 base_url().await
812 ))
813 .bearer_auth(&token)
814 .send()
815 .await
816 .expect("Request failed");
817 assert_eq!(request_res.status(), StatusCode::OK);
818 let plc_token = get_plc_token_from_db(&did).await
819 .expect("PLC token not found");
820 let mock_server = MockServer::start().await;
821 let did_encoded = urlencoding::encode(&did);
822 let last_op = json!({
823 "type": "plc_operation",
824 "rotationKeys": [did_key.clone()],
825 "verificationMethods": { "atproto": did_key.clone() },
826 "alsoKnownAs": [format!("at://{}", handle)],
827 "services": {
828 "atproto_pds": {
829 "type": "AtprotoPersonalDataServer",
830 "endpoint": pds_endpoint.clone()
831 }
832 },
833 "prev": null,
834 "sig": "initial_sig"
835 });
836 Mock::given(method("GET"))
837 .and(path(format!("/{}/log/last", did_encoded)))
838 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
839 .mount(&mock_server)
840 .await;
841 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
842 Mock::given(method("GET"))
843 .and(path(format!("/{}", did_encoded)))
844 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
845 .mount(&mock_server)
846 .await;
847 Mock::given(method("POST"))
848 .and(path(format!("/{}", did_encoded)))
849 .respond_with(ResponseTemplate::new(200))
850 .expect(1)
851 .mount(&mock_server)
852 .await;
853 unsafe {
854 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
855 }
856 let sign_res = client
857 .post(format!(
858 "{}/xrpc/com.atproto.identity.signPlcOperation",
859 base_url().await
860 ))
861 .bearer_auth(&token)
862 .json(&json!({ "token": plc_token }))
863 .send()
864 .await
865 .expect("Sign failed");
866 assert_eq!(sign_res.status(), StatusCode::OK);
867 let sign_body: Value = sign_res.json().await.unwrap();
868 let signed_op = sign_body.get("operation").unwrap().clone();
869 let export_res = client
870 .get(format!(
871 "{}/xrpc/com.atproto.sync.getRepo?did={}",
872 base_url().await,
873 did
874 ))
875 .send()
876 .await
877 .expect("Export failed");
878 assert_eq!(export_res.status(), StatusCode::OK);
879 let car_bytes = export_res.bytes().await.unwrap();
880 let submit_res = client
881 .post(format!(
882 "{}/xrpc/com.atproto.identity.submitPlcOperation",
883 base_url().await
884 ))
885 .bearer_auth(&token)
886 .json(&json!({ "operation": signed_op }))
887 .send()
888 .await
889 .expect("Submit failed");
890 assert_eq!(submit_res.status(), StatusCode::OK);
891 unsafe {
892 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
893 }
894 let import_res = client
895 .post(format!("{}/xrpc/com.atproto.repo.importRepo", base_url().await))
896 .bearer_auth(&token)
897 .header("Content-Type", "application/vnd.ipld.car")
898 .body(car_bytes.to_vec())
899 .send()
900 .await
901 .expect("Import failed");
902 let import_status = import_res.status();
903 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
904 unsafe {
905 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
906 }
907 assert_eq!(
908 import_status,
909 StatusCode::OK,
910 "Full migration flow should succeed. Response: {:?}",
911 import_body
912 );
913 let list_res = client
914 .get(format!(
915 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post",
916 base_url().await,
917 did
918 ))
919 .send()
920 .await
921 .expect("List failed");
922 assert_eq!(list_res.status(), StatusCode::OK);
923 let list_body: Value = list_res.json().await.unwrap();
924 let records = list_body["records"].as_array()
925 .expect("Should have records array");
926 assert!(
927 records.len() >= 1,
928 "Should have at least 1 record after migration, found {}",
929 records.len()
930 );
931}