this repo has no description
1mod common;
2use common::*;
3use k256::ecdsa::SigningKey;
4use reqwest::StatusCode;
5use serde_json::{Value, json};
6use sqlx::PgPool;
7use wiremock::matchers::{method, path};
8use wiremock::{Mock, MockServer, ResponseTemplate};
9
10fn encode_uvarint(mut x: u64) -> Vec<u8> {
11 let mut out = Vec::new();
12 while x >= 0x80 {
13 out.push(((x as u8) & 0x7F) | 0x80);
14 x >>= 7;
15 }
16 out.push(x as u8);
17 out
18}
19
20fn signing_key_to_did_key(signing_key: &SigningKey) -> String {
21 let verifying_key = signing_key.verifying_key();
22 let point = verifying_key.to_encoded_point(true);
23 let compressed_bytes = point.as_bytes();
24 let mut prefixed = vec![0xe7, 0x01];
25 prefixed.extend_from_slice(compressed_bytes);
26 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed);
27 format!("did:key:{}", encoded)
28}
29
30fn get_multikey_from_signing_key(signing_key: &SigningKey) -> String {
31 let public_key = signing_key.verifying_key();
32 let compressed = public_key.to_sec1_bytes();
33 let mut buf = encode_uvarint(0xE7);
34 buf.extend_from_slice(&compressed);
35 multibase::encode(multibase::Base::Base58Btc, buf)
36}
37
38async fn get_user_signing_key(did: &str) -> Option<Vec<u8>> {
39 let db_url = get_db_connection_string().await;
40 let pool = PgPool::connect(&db_url).await.ok()?;
41 let row = sqlx::query!(
42 r#"
43 SELECT k.key_bytes, k.encryption_version
44 FROM user_keys k
45 JOIN users u ON k.user_id = u.id
46 WHERE u.did = $1
47 "#,
48 did
49 )
50 .fetch_optional(&pool)
51 .await
52 .ok()??;
53 tranquil_pds::config::decrypt_key(&row.key_bytes, row.encryption_version).ok()
54}
55
56async fn get_plc_token_from_db(did: &str) -> Option<String> {
57 let db_url = get_db_connection_string().await;
58 let pool = PgPool::connect(&db_url).await.ok()?;
59 sqlx::query_scalar!(
60 r#"
61 SELECT t.token
62 FROM plc_operation_tokens t
63 JOIN users u ON t.user_id = u.id
64 WHERE u.did = $1
65 "#,
66 did
67 )
68 .fetch_optional(&pool)
69 .await
70 .ok()?
71}
72
73async fn get_user_handle(did: &str) -> Option<String> {
74 let db_url = get_db_connection_string().await;
75 let pool = PgPool::connect(&db_url).await.ok()?;
76 sqlx::query_scalar!(r#"SELECT handle FROM users WHERE did = $1"#, did)
77 .fetch_optional(&pool)
78 .await
79 .ok()?
80}
81
82fn create_mock_last_op(
83 _did: &str,
84 handle: &str,
85 signing_key: &SigningKey,
86 pds_endpoint: &str,
87) -> Value {
88 let did_key = signing_key_to_did_key(signing_key);
89 json!({
90 "type": "plc_operation",
91 "rotationKeys": [did_key.clone()],
92 "verificationMethods": {
93 "atproto": did_key
94 },
95 "alsoKnownAs": [format!("at://{}", handle)],
96 "services": {
97 "atproto_pds": {
98 "type": "AtprotoPersonalDataServer",
99 "endpoint": pds_endpoint
100 }
101 },
102 "prev": null,
103 "sig": "mock_signature_for_testing"
104 })
105}
106
107fn create_did_document(
108 did: &str,
109 handle: &str,
110 signing_key: &SigningKey,
111 pds_endpoint: &str,
112) -> Value {
113 let multikey = get_multikey_from_signing_key(signing_key);
114 json!({
115 "@context": [
116 "https://www.w3.org/ns/did/v1",
117 "https://w3id.org/security/multikey/v1"
118 ],
119 "id": did,
120 "alsoKnownAs": [format!("at://{}", handle)],
121 "verificationMethod": [{
122 "id": format!("{}#atproto", did),
123 "type": "Multikey",
124 "controller": did,
125 "publicKeyMultibase": multikey
126 }],
127 "service": [{
128 "id": "#atproto_pds",
129 "type": "AtprotoPersonalDataServer",
130 "serviceEndpoint": pds_endpoint
131 }]
132 })
133}
134
135async fn setup_mock_plc_for_sign(
136 did: &str,
137 handle: &str,
138 signing_key: &SigningKey,
139 pds_endpoint: &str,
140) -> MockServer {
141 let mock_server = MockServer::start().await;
142 let did_encoded = urlencoding::encode(did);
143 let last_op = create_mock_last_op(did, handle, signing_key, pds_endpoint);
144 Mock::given(method("GET"))
145 .and(path(format!("/{}/log/last", did_encoded)))
146 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
147 .mount(&mock_server)
148 .await;
149 mock_server
150}
151
152async fn setup_mock_plc_for_submit(
153 did: &str,
154 handle: &str,
155 signing_key: &SigningKey,
156 pds_endpoint: &str,
157) -> MockServer {
158 let mock_server = MockServer::start().await;
159 let did_encoded = urlencoding::encode(did);
160 let did_doc = create_did_document(did, handle, signing_key, pds_endpoint);
161 Mock::given(method("GET"))
162 .and(path(format!("/{}", did_encoded)))
163 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc.clone()))
164 .mount(&mock_server)
165 .await;
166 Mock::given(method("POST"))
167 .and(path(format!("/{}", did_encoded)))
168 .respond_with(ResponseTemplate::new(200))
169 .mount(&mock_server)
170 .await;
171 mock_server
172}
173
174#[tokio::test]
175#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"]
176async fn test_full_plc_operation_flow() {
177 let client = client();
178 let (token, did) = create_account_and_login(&client).await;
179 let key_bytes = get_user_signing_key(&did)
180 .await
181 .expect("Failed to get user signing key");
182 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
183 let handle = get_user_handle(&did)
184 .await
185 .expect("Failed to get user handle");
186 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
187 let pds_endpoint = format!("https://{}", hostname);
188 let request_res = client
189 .post(format!(
190 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
191 base_url().await
192 ))
193 .bearer_auth(&token)
194 .send()
195 .await
196 .expect("Request failed");
197 assert_eq!(request_res.status(), StatusCode::OK);
198 let plc_token = get_plc_token_from_db(&did)
199 .await
200 .expect("PLC token not found in database");
201 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
202 unsafe {
203 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
204 }
205 let sign_res = client
206 .post(format!(
207 "{}/xrpc/com.atproto.identity.signPlcOperation",
208 base_url().await
209 ))
210 .bearer_auth(&token)
211 .json(&json!({
212 "token": plc_token
213 }))
214 .send()
215 .await
216 .expect("Sign request failed");
217 let sign_status = sign_res.status();
218 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
219 assert_eq!(
220 sign_status,
221 StatusCode::OK,
222 "Sign PLC operation should succeed. Response: {:?}",
223 sign_body
224 );
225 let operation = sign_body
226 .get("operation")
227 .expect("Response should contain operation");
228 assert!(operation.get("sig").is_some(), "Operation should be signed");
229 assert_eq!(
230 operation.get("type").and_then(|v| v.as_str()),
231 Some("plc_operation")
232 );
233 assert!(
234 operation.get("prev").is_some(),
235 "Operation should have prev reference"
236 );
237}
238
239#[tokio::test]
240#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_consumes_token -- --ignored --test-threads=1"]
241async fn test_sign_plc_operation_consumes_token() {
242 let client = client();
243 let (token, did) = create_account_and_login(&client).await;
244 let key_bytes = get_user_signing_key(&did)
245 .await
246 .expect("Failed to get user signing key");
247 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
248 let handle = get_user_handle(&did)
249 .await
250 .expect("Failed to get user handle");
251 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
252 let pds_endpoint = format!("https://{}", hostname);
253 let request_res = client
254 .post(format!(
255 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
256 base_url().await
257 ))
258 .bearer_auth(&token)
259 .send()
260 .await
261 .expect("Request failed");
262 assert_eq!(request_res.status(), StatusCode::OK);
263 let plc_token = get_plc_token_from_db(&did)
264 .await
265 .expect("PLC token not found in database");
266 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
267 unsafe {
268 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
269 }
270 let sign_res = client
271 .post(format!(
272 "{}/xrpc/com.atproto.identity.signPlcOperation",
273 base_url().await
274 ))
275 .bearer_auth(&token)
276 .json(&json!({
277 "token": plc_token
278 }))
279 .send()
280 .await
281 .expect("Sign request failed");
282 assert_eq!(sign_res.status(), StatusCode::OK);
283 let sign_res_2 = client
284 .post(format!(
285 "{}/xrpc/com.atproto.identity.signPlcOperation",
286 base_url().await
287 ))
288 .bearer_auth(&token)
289 .json(&json!({
290 "token": plc_token
291 }))
292 .send()
293 .await
294 .expect("Second sign request failed");
295 assert_eq!(
296 sign_res_2.status(),
297 StatusCode::BAD_REQUEST,
298 "Using the same token twice should fail"
299 );
300 let body: Value = sign_res_2.json().await.unwrap();
301 assert!(
302 body["error"] == "InvalidToken" || body["error"] == "ExpiredToken",
303 "Error should indicate invalid/expired token"
304 );
305}
306
307#[tokio::test]
308#[ignore = "requires exclusive env var access; run with: cargo test test_sign_plc_operation_with_custom_fields -- --ignored --test-threads=1"]
309async fn test_sign_plc_operation_with_custom_fields() {
310 let client = client();
311 let (token, did) = create_account_and_login(&client).await;
312 let key_bytes = get_user_signing_key(&did)
313 .await
314 .expect("Failed to get user signing key");
315 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
316 let handle = get_user_handle(&did)
317 .await
318 .expect("Failed to get user handle");
319 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
320 let pds_endpoint = format!("https://{}", hostname);
321 let request_res = client
322 .post(format!(
323 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
324 base_url().await
325 ))
326 .bearer_auth(&token)
327 .send()
328 .await
329 .expect("Request failed");
330 assert_eq!(request_res.status(), StatusCode::OK);
331 let plc_token = get_plc_token_from_db(&did)
332 .await
333 .expect("PLC token not found in database");
334 let mock_plc = setup_mock_plc_for_sign(&did, &handle, &signing_key, &pds_endpoint).await;
335 unsafe {
336 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
337 }
338 let did_key = signing_key_to_did_key(&signing_key);
339 let sign_res = client
340 .post(format!(
341 "{}/xrpc/com.atproto.identity.signPlcOperation",
342 base_url().await
343 ))
344 .bearer_auth(&token)
345 .json(&json!({
346 "token": plc_token,
347 "alsoKnownAs": [format!("at://{}", handle), "at://custom.alias.example"],
348 "rotationKeys": [did_key.clone(), "did:key:zExtraRotationKey123"]
349 }))
350 .send()
351 .await
352 .expect("Sign request failed");
353 let sign_status = sign_res.status();
354 let sign_body: Value = sign_res.json().await.unwrap_or(json!({}));
355 assert_eq!(
356 sign_status,
357 StatusCode::OK,
358 "Sign with custom fields should succeed. Response: {:?}",
359 sign_body
360 );
361 let operation = sign_body.get("operation").expect("Should have operation");
362 let also_known_as = operation.get("alsoKnownAs").and_then(|v| v.as_array());
363 let rotation_keys = operation.get("rotationKeys").and_then(|v| v.as_array());
364 assert!(also_known_as.is_some(), "Should have alsoKnownAs");
365 assert!(rotation_keys.is_some(), "Should have rotationKeys");
366 assert_eq!(also_known_as.unwrap().len(), 2, "Should have 2 aliases");
367 assert_eq!(
368 rotation_keys.unwrap().len(),
369 2,
370 "Should have 2 rotation keys"
371 );
372}
373
374#[tokio::test]
375#[ignore = "requires mock PLC server setup that is flaky; run manually with --ignored"]
376async fn test_submit_plc_operation_success() {
377 let client = client();
378 let (token, did) = create_account_and_login(&client).await;
379 let key_bytes = get_user_signing_key(&did)
380 .await
381 .expect("Failed to get user signing key");
382 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
383 let handle = get_user_handle(&did)
384 .await
385 .expect("Failed to get user handle");
386 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
387 let pds_endpoint = format!("https://{}", hostname);
388 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
389 unsafe {
390 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
391 }
392 let did_key = signing_key_to_did_key(&signing_key);
393 let operation = json!({
394 "type": "plc_operation",
395 "rotationKeys": [did_key.clone()],
396 "verificationMethods": {
397 "atproto": did_key.clone()
398 },
399 "alsoKnownAs": [format!("at://{}", handle)],
400 "services": {
401 "atproto_pds": {
402 "type": "AtprotoPersonalDataServer",
403 "endpoint": pds_endpoint
404 }
405 },
406 "prev": "bafyreiabc123",
407 "sig": "test_signature_base64"
408 });
409 let submit_res = client
410 .post(format!(
411 "{}/xrpc/com.atproto.identity.submitPlcOperation",
412 base_url().await
413 ))
414 .bearer_auth(&token)
415 .json(&json!({ "operation": operation }))
416 .send()
417 .await
418 .expect("Submit request failed");
419 let submit_status = submit_res.status();
420 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
421 assert_eq!(
422 submit_status,
423 StatusCode::OK,
424 "Submit PLC operation should succeed. Response: {:?}",
425 submit_body
426 );
427}
428
429#[tokio::test]
430async fn test_submit_plc_operation_wrong_endpoint_rejected() {
431 let client = client();
432 let (token, did) = create_account_and_login(&client).await;
433 let key_bytes = get_user_signing_key(&did)
434 .await
435 .expect("Failed to get user signing key");
436 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
437 let handle = get_user_handle(&did)
438 .await
439 .expect("Failed to get user handle");
440 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
441 let pds_endpoint = format!("https://{}", hostname);
442 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
443 unsafe {
444 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
445 }
446 let did_key = signing_key_to_did_key(&signing_key);
447 let operation = json!({
448 "type": "plc_operation",
449 "rotationKeys": [did_key.clone()],
450 "verificationMethods": {
451 "atproto": did_key.clone()
452 },
453 "alsoKnownAs": [format!("at://{}", handle)],
454 "services": {
455 "atproto_pds": {
456 "type": "AtprotoPersonalDataServer",
457 "endpoint": "https://wrong-pds.example.com"
458 }
459 },
460 "prev": "bafyreiabc123",
461 "sig": "test_signature_base64"
462 });
463 let submit_res = client
464 .post(format!(
465 "{}/xrpc/com.atproto.identity.submitPlcOperation",
466 base_url().await
467 ))
468 .bearer_auth(&token)
469 .json(&json!({ "operation": operation }))
470 .send()
471 .await
472 .expect("Submit request failed");
473 assert_eq!(
474 submit_res.status(),
475 StatusCode::BAD_REQUEST,
476 "Submit with wrong endpoint should fail"
477 );
478 let body: Value = submit_res.json().await.unwrap();
479 assert_eq!(body["error"], "InvalidRequest");
480}
481
482#[tokio::test]
483async fn test_submit_plc_operation_wrong_signing_key_rejected() {
484 let client = client();
485 let (token, did) = create_account_and_login(&client).await;
486 let key_bytes = get_user_signing_key(&did)
487 .await
488 .expect("Failed to get user signing key");
489 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
490 let handle = get_user_handle(&did)
491 .await
492 .expect("Failed to get user handle");
493 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
494 let pds_endpoint = format!("https://{}", hostname);
495 let mock_plc = setup_mock_plc_for_submit(&did, &handle, &signing_key, &pds_endpoint).await;
496 unsafe {
497 std::env::set_var("PLC_DIRECTORY_URL", mock_plc.uri());
498 }
499 let wrong_key = SigningKey::random(&mut rand::thread_rng());
500 let wrong_did_key = signing_key_to_did_key(&wrong_key);
501 let correct_did_key = signing_key_to_did_key(&signing_key);
502 let operation = json!({
503 "type": "plc_operation",
504 "rotationKeys": [correct_did_key.clone()],
505 "verificationMethods": {
506 "atproto": wrong_did_key
507 },
508 "alsoKnownAs": [format!("at://{}", handle)],
509 "services": {
510 "atproto_pds": {
511 "type": "AtprotoPersonalDataServer",
512 "endpoint": pds_endpoint
513 }
514 },
515 "prev": "bafyreiabc123",
516 "sig": "test_signature_base64"
517 });
518 let submit_res = client
519 .post(format!(
520 "{}/xrpc/com.atproto.identity.submitPlcOperation",
521 base_url().await
522 ))
523 .bearer_auth(&token)
524 .json(&json!({ "operation": operation }))
525 .send()
526 .await
527 .expect("Submit request failed");
528 assert_eq!(
529 submit_res.status(),
530 StatusCode::BAD_REQUEST,
531 "Submit with wrong signing key should fail"
532 );
533 let body: Value = submit_res.json().await.unwrap();
534 assert_eq!(body["error"], "InvalidRequest");
535}
536
537#[tokio::test]
538async fn test_full_sign_and_submit_flow() {
539 let client = client();
540 let (token, did) = create_account_and_login(&client).await;
541 let key_bytes = get_user_signing_key(&did)
542 .await
543 .expect("Failed to get user signing key");
544 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
545 let handle = get_user_handle(&did)
546 .await
547 .expect("Failed to get user handle");
548 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
549 let pds_endpoint = format!("https://{}", hostname);
550 let request_res = client
551 .post(format!(
552 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
553 base_url().await
554 ))
555 .bearer_auth(&token)
556 .send()
557 .await
558 .expect("Request failed");
559 assert_eq!(request_res.status(), StatusCode::OK);
560 let plc_token = get_plc_token_from_db(&did)
561 .await
562 .expect("PLC token not found");
563 let mock_server = MockServer::start().await;
564 let did_encoded = urlencoding::encode(&did);
565 let did_key = signing_key_to_did_key(&signing_key);
566 let last_op = json!({
567 "type": "plc_operation",
568 "rotationKeys": [did_key.clone()],
569 "verificationMethods": {
570 "atproto": did_key.clone()
571 },
572 "alsoKnownAs": [format!("at://{}", handle)],
573 "services": {
574 "atproto_pds": {
575 "type": "AtprotoPersonalDataServer",
576 "endpoint": pds_endpoint.clone()
577 }
578 },
579 "prev": null,
580 "sig": "initial_sig"
581 });
582 Mock::given(method("GET"))
583 .and(path(format!("/{}/log/last", did_encoded)))
584 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
585 .mount(&mock_server)
586 .await;
587 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
588 Mock::given(method("GET"))
589 .and(path(format!("/{}", did_encoded)))
590 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
591 .mount(&mock_server)
592 .await;
593 Mock::given(method("POST"))
594 .and(path(format!("/{}", did_encoded)))
595 .respond_with(ResponseTemplate::new(200))
596 .expect(1)
597 .mount(&mock_server)
598 .await;
599 unsafe {
600 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
601 }
602 let sign_res = client
603 .post(format!(
604 "{}/xrpc/com.atproto.identity.signPlcOperation",
605 base_url().await
606 ))
607 .bearer_auth(&token)
608 .json(&json!({ "token": plc_token }))
609 .send()
610 .await
611 .expect("Sign failed");
612 assert_eq!(sign_res.status(), StatusCode::OK);
613 let sign_body: Value = sign_res.json().await.unwrap();
614 let signed_operation = sign_body
615 .get("operation")
616 .expect("Response should contain operation")
617 .clone();
618 assert!(signed_operation.get("sig").is_some());
619 assert!(signed_operation.get("prev").is_some());
620 let submit_res = client
621 .post(format!(
622 "{}/xrpc/com.atproto.identity.submitPlcOperation",
623 base_url().await
624 ))
625 .bearer_auth(&token)
626 .json(&json!({ "operation": signed_operation }))
627 .send()
628 .await
629 .expect("Submit failed");
630 let submit_status = submit_res.status();
631 let submit_body: Value = submit_res.json().await.unwrap_or(json!({}));
632 assert_eq!(
633 submit_status,
634 StatusCode::OK,
635 "Full sign and submit flow should succeed. Response: {:?}",
636 submit_body
637 );
638}
639
640#[tokio::test]
641#[ignore = "requires exclusive env var access; run with: cargo test test_cross_pds_migration_with_records -- --ignored --test-threads=1"]
642async fn test_cross_pds_migration_with_records() {
643 let client = client();
644 let (token, did) = create_account_and_login(&client).await;
645 let key_bytes = get_user_signing_key(&did)
646 .await
647 .expect("Failed to get user signing key");
648 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
649 let handle = get_user_handle(&did)
650 .await
651 .expect("Failed to get user handle");
652 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
653 let pds_endpoint = format!("https://{}", hostname);
654 let post_payload = json!({
655 "repo": did,
656 "collection": "app.bsky.feed.post",
657 "record": {
658 "$type": "app.bsky.feed.post",
659 "text": "Test post before migration",
660 "createdAt": chrono::Utc::now().to_rfc3339(),
661 }
662 });
663 let create_res = client
664 .post(format!(
665 "{}/xrpc/com.atproto.repo.createRecord",
666 base_url().await
667 ))
668 .bearer_auth(&token)
669 .json(&post_payload)
670 .send()
671 .await
672 .expect("Failed to create post");
673 assert_eq!(create_res.status(), StatusCode::OK);
674 let create_body: Value = create_res.json().await.unwrap();
675 let original_uri = create_body["uri"].as_str().unwrap().to_string();
676 let export_res = client
677 .get(format!(
678 "{}/xrpc/com.atproto.sync.getRepo?did={}",
679 base_url().await,
680 did
681 ))
682 .send()
683 .await
684 .expect("Export failed");
685 assert_eq!(export_res.status(), StatusCode::OK);
686 let car_bytes = export_res.bytes().await.unwrap();
687 assert!(
688 car_bytes.len() > 100,
689 "CAR file should have meaningful content"
690 );
691 let mock_server = MockServer::start().await;
692 let did_encoded = urlencoding::encode(&did);
693 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
694 Mock::given(method("GET"))
695 .and(path(format!("/{}", did_encoded)))
696 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
697 .mount(&mock_server)
698 .await;
699 unsafe {
700 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
701 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
702 }
703 let import_res = client
704 .post(format!(
705 "{}/xrpc/com.atproto.repo.importRepo",
706 base_url().await
707 ))
708 .bearer_auth(&token)
709 .header("Content-Type", "application/vnd.ipld.car")
710 .body(car_bytes.to_vec())
711 .send()
712 .await
713 .expect("Import failed");
714 let import_status = import_res.status();
715 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
716 unsafe {
717 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
718 }
719 assert_eq!(
720 import_status,
721 StatusCode::OK,
722 "Import with valid DID document should succeed. Response: {:?}",
723 import_body
724 );
725 let get_record_res = client
726 .get(format!(
727 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.feed.post&rkey={}",
728 base_url().await,
729 did,
730 original_uri.split('/').next_back().unwrap()
731 ))
732 .send()
733 .await
734 .expect("Get record failed");
735 assert_eq!(
736 get_record_res.status(),
737 StatusCode::OK,
738 "Record should be retrievable after import"
739 );
740 let record_body: Value = get_record_res.json().await.unwrap();
741 assert_eq!(
742 record_body["value"]["text"], "Test post before migration",
743 "Record content should match"
744 );
745}
746
747#[tokio::test]
748async fn test_migration_rejects_wrong_did_document() {
749 let client = client();
750 let (token, did) = create_account_and_login(&client).await;
751 let wrong_signing_key = SigningKey::random(&mut rand::thread_rng());
752 let handle = get_user_handle(&did)
753 .await
754 .expect("Failed to get user handle");
755 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
756 let pds_endpoint = format!("https://{}", hostname);
757 let export_res = client
758 .get(format!(
759 "{}/xrpc/com.atproto.sync.getRepo?did={}",
760 base_url().await,
761 did
762 ))
763 .send()
764 .await
765 .expect("Export failed");
766 assert_eq!(export_res.status(), StatusCode::OK);
767 let car_bytes = export_res.bytes().await.unwrap();
768 let mock_server = MockServer::start().await;
769 let did_encoded = urlencoding::encode(&did);
770 let wrong_did_doc = create_did_document(&did, &handle, &wrong_signing_key, &pds_endpoint);
771 Mock::given(method("GET"))
772 .and(path(format!("/{}", did_encoded)))
773 .respond_with(ResponseTemplate::new(200).set_body_json(wrong_did_doc))
774 .mount(&mock_server)
775 .await;
776 unsafe {
777 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
778 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
779 }
780 let import_res = client
781 .post(format!(
782 "{}/xrpc/com.atproto.repo.importRepo",
783 base_url().await
784 ))
785 .bearer_auth(&token)
786 .header("Content-Type", "application/vnd.ipld.car")
787 .body(car_bytes.to_vec())
788 .send()
789 .await
790 .expect("Import failed");
791 let import_status = import_res.status();
792 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
793 unsafe {
794 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
795 }
796 assert_eq!(
797 import_status,
798 StatusCode::BAD_REQUEST,
799 "Import with wrong DID document should fail. Response: {:?}",
800 import_body
801 );
802 assert!(
803 import_body["error"] == "InvalidSignature"
804 || import_body["message"]
805 .as_str()
806 .unwrap_or("")
807 .contains("signature"),
808 "Error should mention signature verification failure"
809 );
810}
811
812#[tokio::test]
813#[ignore = "requires exclusive env var access; run with: cargo test test_full_migration_flow_end_to_end -- --ignored --test-threads=1"]
814async fn test_full_migration_flow_end_to_end() {
815 let client = client();
816 let (token, did) = create_account_and_login(&client).await;
817 let key_bytes = get_user_signing_key(&did)
818 .await
819 .expect("Failed to get user signing key");
820 let signing_key = SigningKey::from_slice(&key_bytes).expect("Failed to create signing key");
821 let handle = get_user_handle(&did)
822 .await
823 .expect("Failed to get user handle");
824 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
825 let pds_endpoint = format!("https://{}", hostname);
826 let did_key = signing_key_to_did_key(&signing_key);
827 for i in 0..3 {
828 let post_payload = json!({
829 "repo": did,
830 "collection": "app.bsky.feed.post",
831 "record": {
832 "$type": "app.bsky.feed.post",
833 "text": format!("Pre-migration post #{}", i),
834 "createdAt": chrono::Utc::now().to_rfc3339(),
835 }
836 });
837 let res = client
838 .post(format!(
839 "{}/xrpc/com.atproto.repo.createRecord",
840 base_url().await
841 ))
842 .bearer_auth(&token)
843 .json(&post_payload)
844 .send()
845 .await
846 .expect("Failed to create post");
847 assert_eq!(res.status(), StatusCode::OK);
848 }
849 let request_res = client
850 .post(format!(
851 "{}/xrpc/com.atproto.identity.requestPlcOperationSignature",
852 base_url().await
853 ))
854 .bearer_auth(&token)
855 .send()
856 .await
857 .expect("Request failed");
858 assert_eq!(request_res.status(), StatusCode::OK);
859 let plc_token = get_plc_token_from_db(&did)
860 .await
861 .expect("PLC token not found");
862 let mock_server = MockServer::start().await;
863 let did_encoded = urlencoding::encode(&did);
864 let last_op = json!({
865 "type": "plc_operation",
866 "rotationKeys": [did_key.clone()],
867 "verificationMethods": { "atproto": did_key.clone() },
868 "alsoKnownAs": [format!("at://{}", handle)],
869 "services": {
870 "atproto_pds": {
871 "type": "AtprotoPersonalDataServer",
872 "endpoint": pds_endpoint.clone()
873 }
874 },
875 "prev": null,
876 "sig": "initial_sig"
877 });
878 Mock::given(method("GET"))
879 .and(path(format!("/{}/log/last", did_encoded)))
880 .respond_with(ResponseTemplate::new(200).set_body_json(last_op))
881 .mount(&mock_server)
882 .await;
883 let did_doc = create_did_document(&did, &handle, &signing_key, &pds_endpoint);
884 Mock::given(method("GET"))
885 .and(path(format!("/{}", did_encoded)))
886 .respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
887 .mount(&mock_server)
888 .await;
889 Mock::given(method("POST"))
890 .and(path(format!("/{}", did_encoded)))
891 .respond_with(ResponseTemplate::new(200))
892 .expect(1)
893 .mount(&mock_server)
894 .await;
895 unsafe {
896 std::env::set_var("PLC_DIRECTORY_URL", mock_server.uri());
897 }
898 let sign_res = client
899 .post(format!(
900 "{}/xrpc/com.atproto.identity.signPlcOperation",
901 base_url().await
902 ))
903 .bearer_auth(&token)
904 .json(&json!({ "token": plc_token }))
905 .send()
906 .await
907 .expect("Sign failed");
908 assert_eq!(sign_res.status(), StatusCode::OK);
909 let sign_body: Value = sign_res.json().await.unwrap();
910 let signed_op = sign_body.get("operation").unwrap().clone();
911 let export_res = client
912 .get(format!(
913 "{}/xrpc/com.atproto.sync.getRepo?did={}",
914 base_url().await,
915 did
916 ))
917 .send()
918 .await
919 .expect("Export failed");
920 assert_eq!(export_res.status(), StatusCode::OK);
921 let car_bytes = export_res.bytes().await.unwrap();
922 let submit_res = client
923 .post(format!(
924 "{}/xrpc/com.atproto.identity.submitPlcOperation",
925 base_url().await
926 ))
927 .bearer_auth(&token)
928 .json(&json!({ "operation": signed_op }))
929 .send()
930 .await
931 .expect("Submit failed");
932 assert_eq!(submit_res.status(), StatusCode::OK);
933 unsafe {
934 std::env::remove_var("SKIP_IMPORT_VERIFICATION");
935 }
936 let import_res = client
937 .post(format!(
938 "{}/xrpc/com.atproto.repo.importRepo",
939 base_url().await
940 ))
941 .bearer_auth(&token)
942 .header("Content-Type", "application/vnd.ipld.car")
943 .body(car_bytes.to_vec())
944 .send()
945 .await
946 .expect("Import failed");
947 let import_status = import_res.status();
948 let import_body: Value = import_res.json().await.unwrap_or(json!({}));
949 unsafe {
950 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
951 }
952 assert_eq!(
953 import_status,
954 StatusCode::OK,
955 "Full migration flow should succeed. Response: {:?}",
956 import_body
957 );
958 let list_res = client
959 .get(format!(
960 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection=app.bsky.feed.post",
961 base_url().await,
962 did
963 ))
964 .send()
965 .await
966 .expect("List failed");
967 assert_eq!(list_res.status(), StatusCode::OK);
968 let list_body: Value = list_res.json().await.unwrap();
969 let records = list_body["records"]
970 .as_array()
971 .expect("Should have records array");
972 assert!(
973 !records.is_empty(),
974 "Should have at least 1 record after migration, found {}",
975 records.len()
976 );
977}