+3
-3
TODO.md
+3
-3
TODO.md
···
68
68
- [ ] Broadcast real-time commit events.
69
69
- [ ] Handle cursor replay (backfill).
70
70
- [ ] Bulk Export
71
-
- [ ] Implement `com.atproto.sync.getRepo` (Return full CAR file of repo).
72
-
- [ ] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs).
71
+
- [x] Implement `com.atproto.sync.getRepo` (Return full CAR file of repo).
72
+
- [x] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs).
73
73
- [x] Implement `com.atproto.sync.getLatestCommit`.
74
-
- [ ] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord).
74
+
- [x] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord).
75
75
- [x] Implement `com.atproto.sync.getRepoStatus`.
76
76
- [x] Implement `com.atproto.sync.listRepos`.
77
77
- [x] Implement `com.atproto.sync.notifyOfUpdate`.
+12
src/lib.rs
+12
src/lib.rs
···
119
119
post(sync::request_crawl),
120
120
)
121
121
.route(
122
+
"/xrpc/com.atproto.sync.getBlocks",
123
+
get(sync::get_blocks),
124
+
)
125
+
.route(
126
+
"/xrpc/com.atproto.sync.getRepo",
127
+
get(sync::get_repo),
128
+
)
129
+
.route(
130
+
"/xrpc/com.atproto.sync.getRecord",
131
+
get(sync::get_record),
132
+
)
133
+
.route(
122
134
"/xrpc/com.atproto.moderation.createReport",
123
135
post(api::moderation::create_report),
124
136
)
+574
-1
src/sync/mod.rs
+574
-1
src/sync/mod.rs
···
7
7
http::header,
8
8
response::{IntoResponse, Response},
9
9
};
10
+
use bytes::Bytes;
11
+
use cid::Cid;
12
+
use jacquard_repo::{commit::Commit, storage::BlockStore};
10
13
use serde::{Deserialize, Serialize};
11
14
use serde_json::json;
15
+
use std::collections::HashSet;
16
+
use std::io::Write;
12
17
use tracing::{error, info};
18
+
19
+
fn write_varint<W: Write>(mut writer: W, mut value: u64) -> std::io::Result<()> {
20
+
loop {
21
+
let mut byte = (value & 0x7F) as u8;
22
+
value >>= 7;
23
+
if value != 0 {
24
+
byte |= 0x80;
25
+
}
26
+
writer.write_all(&[byte])?;
27
+
if value == 0 {
28
+
break;
29
+
}
30
+
}
31
+
Ok(())
32
+
}
33
+
34
+
fn ld_write<W: Write>(mut writer: W, data: &[u8]) -> std::io::Result<()> {
35
+
write_varint(&mut writer, data.len() as u64)?;
36
+
writer.write_all(data)?;
37
+
Ok(())
38
+
}
39
+
40
+
fn encode_car_header(root_cid: &Cid) -> Vec<u8> {
41
+
let header = serde_ipld_dagcbor::to_vec(&serde_json::json!({
42
+
"version": 1u64,
43
+
"roots": [root_cid.to_bytes()]
44
+
}))
45
+
.unwrap_or_default();
46
+
header
47
+
}
13
48
14
49
#[derive(Deserialize)]
15
50
pub struct GetLatestCommitParams {
···
471
506
Json(input): Json<RequestCrawlInput>,
472
507
) -> Response {
473
508
info!("Received requestCrawl for hostname: {}", input.hostname);
474
-
// TODO: Queue job for crawling
475
509
info!("TODO: Queue job for requestCrawl (not implemented)");
476
510
477
511
(StatusCode::OK, Json(json!({}))).into_response()
478
512
}
513
+
514
+
#[derive(Deserialize)]
515
+
pub struct GetBlocksParams {
516
+
pub did: String,
517
+
pub cids: String,
518
+
}
519
+
520
+
pub async fn get_blocks(
521
+
State(state): State<AppState>,
522
+
Query(params): Query<GetBlocksParams>,
523
+
) -> Response {
524
+
let did = params.did.trim();
525
+
526
+
if did.is_empty() {
527
+
return (
528
+
StatusCode::BAD_REQUEST,
529
+
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
530
+
)
531
+
.into_response();
532
+
}
533
+
534
+
let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect();
535
+
536
+
if cid_strings.is_empty() {
537
+
return (
538
+
StatusCode::BAD_REQUEST,
539
+
Json(json!({"error": "InvalidRequest", "message": "cids is required"})),
540
+
)
541
+
.into_response();
542
+
}
543
+
544
+
let repo_result = sqlx::query!(
545
+
r#"
546
+
SELECT r.repo_root_cid
547
+
FROM repos r
548
+
JOIN users u ON r.user_id = u.id
549
+
WHERE u.did = $1
550
+
"#,
551
+
did
552
+
)
553
+
.fetch_optional(&state.db)
554
+
.await;
555
+
556
+
let repo_root_cid_str = match repo_result {
557
+
Ok(Some(row)) => row.repo_root_cid,
558
+
Ok(None) => {
559
+
return (
560
+
StatusCode::NOT_FOUND,
561
+
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
562
+
)
563
+
.into_response();
564
+
}
565
+
Err(e) => {
566
+
error!("DB error in get_blocks: {:?}", e);
567
+
return (
568
+
StatusCode::INTERNAL_SERVER_ERROR,
569
+
Json(json!({"error": "InternalError"})),
570
+
)
571
+
.into_response();
572
+
}
573
+
};
574
+
575
+
let root_cid = match repo_root_cid_str.parse::<Cid>() {
576
+
Ok(c) => c,
577
+
Err(e) => {
578
+
error!("Failed to parse root CID: {:?}", e);
579
+
return (
580
+
StatusCode::INTERNAL_SERVER_ERROR,
581
+
Json(json!({"error": "InternalError"})),
582
+
)
583
+
.into_response();
584
+
}
585
+
};
586
+
587
+
let mut requested_cids: Vec<Cid> = Vec::new();
588
+
for cid_str in &cid_strings {
589
+
match cid_str.parse::<Cid>() {
590
+
Ok(c) => requested_cids.push(c),
591
+
Err(e) => {
592
+
error!("Failed to parse CID '{}': {:?}", cid_str, e);
593
+
return (
594
+
StatusCode::BAD_REQUEST,
595
+
Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})),
596
+
)
597
+
.into_response();
598
+
}
599
+
}
600
+
}
601
+
602
+
let mut buf = Vec::new();
603
+
let header = encode_car_header(&root_cid);
604
+
if let Err(e) = ld_write(&mut buf, &header) {
605
+
error!("Failed to write CAR header: {:?}", e);
606
+
return (
607
+
StatusCode::INTERNAL_SERVER_ERROR,
608
+
Json(json!({"error": "InternalError"})),
609
+
)
610
+
.into_response();
611
+
}
612
+
613
+
for cid in &requested_cids {
614
+
let cid_bytes = cid.to_bytes();
615
+
let block_result = sqlx::query!(
616
+
"SELECT data FROM blocks WHERE cid = $1",
617
+
&cid_bytes
618
+
)
619
+
.fetch_optional(&state.db)
620
+
.await;
621
+
622
+
match block_result {
623
+
Ok(Some(row)) => {
624
+
let mut block_data = Vec::new();
625
+
block_data.extend_from_slice(&cid_bytes);
626
+
block_data.extend_from_slice(&row.data);
627
+
if let Err(e) = ld_write(&mut buf, &block_data) {
628
+
error!("Failed to write block: {:?}", e);
629
+
return (
630
+
StatusCode::INTERNAL_SERVER_ERROR,
631
+
Json(json!({"error": "InternalError"})),
632
+
)
633
+
.into_response();
634
+
}
635
+
}
636
+
Ok(None) => {
637
+
return (
638
+
StatusCode::NOT_FOUND,
639
+
Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})),
640
+
)
641
+
.into_response();
642
+
}
643
+
Err(e) => {
644
+
error!("DB error fetching block: {:?}", e);
645
+
return (
646
+
StatusCode::INTERNAL_SERVER_ERROR,
647
+
Json(json!({"error": "InternalError"})),
648
+
)
649
+
.into_response();
650
+
}
651
+
}
652
+
}
653
+
654
+
Response::builder()
655
+
.status(StatusCode::OK)
656
+
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
657
+
.body(Body::from(buf))
658
+
.unwrap()
659
+
}
660
+
661
+
#[derive(Deserialize)]
662
+
pub struct GetRepoParams {
663
+
pub did: String,
664
+
pub since: Option<String>,
665
+
}
666
+
667
+
pub async fn get_repo(
668
+
State(state): State<AppState>,
669
+
Query(params): Query<GetRepoParams>,
670
+
) -> Response {
671
+
let did = params.did.trim();
672
+
673
+
if did.is_empty() {
674
+
return (
675
+
StatusCode::BAD_REQUEST,
676
+
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
677
+
)
678
+
.into_response();
679
+
}
680
+
681
+
let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
682
+
.fetch_optional(&state.db)
683
+
.await;
684
+
685
+
let user_id = match user_result {
686
+
Ok(Some(row)) => row.id,
687
+
Ok(None) => {
688
+
return (
689
+
StatusCode::NOT_FOUND,
690
+
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
691
+
)
692
+
.into_response();
693
+
}
694
+
Err(e) => {
695
+
error!("DB error in get_repo: {:?}", e);
696
+
return (
697
+
StatusCode::INTERNAL_SERVER_ERROR,
698
+
Json(json!({"error": "InternalError"})),
699
+
)
700
+
.into_response();
701
+
}
702
+
};
703
+
704
+
let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
705
+
.fetch_optional(&state.db)
706
+
.await;
707
+
708
+
let repo_root_cid_str = match repo_result {
709
+
Ok(Some(row)) => row.repo_root_cid,
710
+
Ok(None) => {
711
+
return (
712
+
StatusCode::NOT_FOUND,
713
+
Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
714
+
)
715
+
.into_response();
716
+
}
717
+
Err(e) => {
718
+
error!("DB error in get_repo: {:?}", e);
719
+
return (
720
+
StatusCode::INTERNAL_SERVER_ERROR,
721
+
Json(json!({"error": "InternalError"})),
722
+
)
723
+
.into_response();
724
+
}
725
+
};
726
+
727
+
let root_cid = match repo_root_cid_str.parse::<Cid>() {
728
+
Ok(c) => c,
729
+
Err(e) => {
730
+
error!("Failed to parse root CID: {:?}", e);
731
+
return (
732
+
StatusCode::INTERNAL_SERVER_ERROR,
733
+
Json(json!({"error": "InternalError"})),
734
+
)
735
+
.into_response();
736
+
}
737
+
};
738
+
739
+
let commit_bytes = match state.block_store.get(&root_cid).await {
740
+
Ok(Some(b)) => b,
741
+
Ok(None) => {
742
+
error!("Commit block not found: {}", root_cid);
743
+
return (
744
+
StatusCode::INTERNAL_SERVER_ERROR,
745
+
Json(json!({"error": "InternalError"})),
746
+
)
747
+
.into_response();
748
+
}
749
+
Err(e) => {
750
+
error!("Failed to load commit block: {:?}", e);
751
+
return (
752
+
StatusCode::INTERNAL_SERVER_ERROR,
753
+
Json(json!({"error": "InternalError"})),
754
+
)
755
+
.into_response();
756
+
}
757
+
};
758
+
759
+
let commit = match Commit::from_cbor(&commit_bytes) {
760
+
Ok(c) => c,
761
+
Err(e) => {
762
+
error!("Failed to parse commit: {:?}", e);
763
+
return (
764
+
StatusCode::INTERNAL_SERVER_ERROR,
765
+
Json(json!({"error": "InternalError"})),
766
+
)
767
+
.into_response();
768
+
}
769
+
};
770
+
771
+
let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
772
+
let mut visited: HashSet<Vec<u8>> = HashSet::new();
773
+
774
+
collected_blocks.push((root_cid, commit_bytes.clone()));
775
+
visited.insert(root_cid.to_bytes());
776
+
777
+
let mst_root_cid = commit.data;
778
+
if !visited.contains(&mst_root_cid.to_bytes()) {
779
+
visited.insert(mst_root_cid.to_bytes());
780
+
if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
781
+
collected_blocks.push((mst_root_cid, data));
782
+
}
783
+
}
784
+
785
+
let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id)
786
+
.fetch_all(&state.db)
787
+
.await
788
+
.unwrap_or_default();
789
+
790
+
for record in records {
791
+
if let Ok(cid) = record.record_cid.parse::<Cid>() {
792
+
if !visited.contains(&cid.to_bytes()) {
793
+
visited.insert(cid.to_bytes());
794
+
if let Ok(Some(data)) = state.block_store.get(&cid).await {
795
+
collected_blocks.push((cid, data));
796
+
}
797
+
}
798
+
}
799
+
}
800
+
801
+
let mut buf = Vec::new();
802
+
let header = encode_car_header(&root_cid);
803
+
if let Err(e) = ld_write(&mut buf, &header) {
804
+
error!("Failed to write CAR header: {:?}", e);
805
+
return (
806
+
StatusCode::INTERNAL_SERVER_ERROR,
807
+
Json(json!({"error": "InternalError"})),
808
+
)
809
+
.into_response();
810
+
}
811
+
812
+
for (cid, data) in &collected_blocks {
813
+
let mut block_data = Vec::new();
814
+
block_data.extend_from_slice(&cid.to_bytes());
815
+
block_data.extend_from_slice(data);
816
+
if let Err(e) = ld_write(&mut buf, &block_data) {
817
+
error!("Failed to write block: {:?}", e);
818
+
return (
819
+
StatusCode::INTERNAL_SERVER_ERROR,
820
+
Json(json!({"error": "InternalError"})),
821
+
)
822
+
.into_response();
823
+
}
824
+
}
825
+
826
+
Response::builder()
827
+
.status(StatusCode::OK)
828
+
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
829
+
.body(Body::from(buf))
830
+
.unwrap()
831
+
}
832
+
833
+
#[derive(Deserialize)]
834
+
pub struct GetRecordParams {
835
+
pub did: String,
836
+
pub collection: String,
837
+
pub rkey: String,
838
+
}
839
+
840
+
pub async fn get_record(
841
+
State(state): State<AppState>,
842
+
Query(params): Query<GetRecordParams>,
843
+
) -> Response {
844
+
let did = params.did.trim();
845
+
let collection = params.collection.trim();
846
+
let rkey = params.rkey.trim();
847
+
848
+
if did.is_empty() {
849
+
return (
850
+
StatusCode::BAD_REQUEST,
851
+
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
852
+
)
853
+
.into_response();
854
+
}
855
+
856
+
if collection.is_empty() {
857
+
return (
858
+
StatusCode::BAD_REQUEST,
859
+
Json(json!({"error": "InvalidRequest", "message": "collection is required"})),
860
+
)
861
+
.into_response();
862
+
}
863
+
864
+
if rkey.is_empty() {
865
+
return (
866
+
StatusCode::BAD_REQUEST,
867
+
Json(json!({"error": "InvalidRequest", "message": "rkey is required"})),
868
+
)
869
+
.into_response();
870
+
}
871
+
872
+
let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
873
+
.fetch_optional(&state.db)
874
+
.await;
875
+
876
+
let user_id = match user_result {
877
+
Ok(Some(row)) => row.id,
878
+
Ok(None) => {
879
+
return (
880
+
StatusCode::NOT_FOUND,
881
+
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
882
+
)
883
+
.into_response();
884
+
}
885
+
Err(e) => {
886
+
error!("DB error in sync get_record: {:?}", e);
887
+
return (
888
+
StatusCode::INTERNAL_SERVER_ERROR,
889
+
Json(json!({"error": "InternalError"})),
890
+
)
891
+
.into_response();
892
+
}
893
+
};
894
+
895
+
let record_result = sqlx::query!(
896
+
"SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
897
+
user_id,
898
+
collection,
899
+
rkey
900
+
)
901
+
.fetch_optional(&state.db)
902
+
.await;
903
+
904
+
let record_cid_str = match record_result {
905
+
Ok(Some(row)) => row.record_cid,
906
+
Ok(None) => {
907
+
return (
908
+
StatusCode::NOT_FOUND,
909
+
Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
910
+
)
911
+
.into_response();
912
+
}
913
+
Err(e) => {
914
+
error!("DB error in sync get_record: {:?}", e);
915
+
return (
916
+
StatusCode::INTERNAL_SERVER_ERROR,
917
+
Json(json!({"error": "InternalError"})),
918
+
)
919
+
.into_response();
920
+
}
921
+
};
922
+
923
+
let record_cid = match record_cid_str.parse::<Cid>() {
924
+
Ok(c) => c,
925
+
Err(e) => {
926
+
error!("Failed to parse record CID: {:?}", e);
927
+
return (
928
+
StatusCode::INTERNAL_SERVER_ERROR,
929
+
Json(json!({"error": "InternalError"})),
930
+
)
931
+
.into_response();
932
+
}
933
+
};
934
+
935
+
let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
936
+
.fetch_optional(&state.db)
937
+
.await;
938
+
939
+
let repo_root_cid_str = match repo_result {
940
+
Ok(Some(row)) => row.repo_root_cid,
941
+
Ok(None) => {
942
+
return (
943
+
StatusCode::NOT_FOUND,
944
+
Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
945
+
)
946
+
.into_response();
947
+
}
948
+
Err(e) => {
949
+
error!("DB error in sync get_record: {:?}", e);
950
+
return (
951
+
StatusCode::INTERNAL_SERVER_ERROR,
952
+
Json(json!({"error": "InternalError"})),
953
+
)
954
+
.into_response();
955
+
}
956
+
};
957
+
958
+
let root_cid = match repo_root_cid_str.parse::<Cid>() {
959
+
Ok(c) => c,
960
+
Err(e) => {
961
+
error!("Failed to parse root CID: {:?}", e);
962
+
return (
963
+
StatusCode::INTERNAL_SERVER_ERROR,
964
+
Json(json!({"error": "InternalError"})),
965
+
)
966
+
.into_response();
967
+
}
968
+
};
969
+
970
+
let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
971
+
972
+
let commit_bytes = match state.block_store.get(&root_cid).await {
973
+
Ok(Some(b)) => b,
974
+
Ok(None) => {
975
+
error!("Commit block not found: {}", root_cid);
976
+
return (
977
+
StatusCode::INTERNAL_SERVER_ERROR,
978
+
Json(json!({"error": "InternalError"})),
979
+
)
980
+
.into_response();
981
+
}
982
+
Err(e) => {
983
+
error!("Failed to load commit block: {:?}", e);
984
+
return (
985
+
StatusCode::INTERNAL_SERVER_ERROR,
986
+
Json(json!({"error": "InternalError"})),
987
+
)
988
+
.into_response();
989
+
}
990
+
};
991
+
992
+
collected_blocks.push((root_cid, commit_bytes.clone()));
993
+
994
+
let commit = match Commit::from_cbor(&commit_bytes) {
995
+
Ok(c) => c,
996
+
Err(e) => {
997
+
error!("Failed to parse commit: {:?}", e);
998
+
return (
999
+
StatusCode::INTERNAL_SERVER_ERROR,
1000
+
Json(json!({"error": "InternalError"})),
1001
+
)
1002
+
.into_response();
1003
+
}
1004
+
};
1005
+
1006
+
let mst_root_cid = commit.data;
1007
+
if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
1008
+
collected_blocks.push((mst_root_cid, data));
1009
+
}
1010
+
1011
+
if let Ok(Some(data)) = state.block_store.get(&record_cid).await {
1012
+
collected_blocks.push((record_cid, data));
1013
+
} else {
1014
+
return (
1015
+
StatusCode::NOT_FOUND,
1016
+
Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
1017
+
)
1018
+
.into_response();
1019
+
}
1020
+
1021
+
let mut buf = Vec::new();
1022
+
let header = encode_car_header(&root_cid);
1023
+
if let Err(e) = ld_write(&mut buf, &header) {
1024
+
error!("Failed to write CAR header: {:?}", e);
1025
+
return (
1026
+
StatusCode::INTERNAL_SERVER_ERROR,
1027
+
Json(json!({"error": "InternalError"})),
1028
+
)
1029
+
.into_response();
1030
+
}
1031
+
1032
+
for (cid, data) in &collected_blocks {
1033
+
let mut block_data = Vec::new();
1034
+
block_data.extend_from_slice(&cid.to_bytes());
1035
+
block_data.extend_from_slice(data);
1036
+
if let Err(e) = ld_write(&mut buf, &block_data) {
1037
+
error!("Failed to write block: {:?}", e);
1038
+
return (
1039
+
StatusCode::INTERNAL_SERVER_ERROR,
1040
+
Json(json!({"error": "InternalError"})),
1041
+
)
1042
+
.into_response();
1043
+
}
1044
+
}
1045
+
1046
+
Response::builder()
1047
+
.status(StatusCode::OK)
1048
+
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
1049
+
.body(Body::from(buf))
1050
+
.unwrap()
1051
+
}
+7
-3
tests/common/mod.rs
+7
-3
tests/common/mod.rs
···
184
184
.await
185
185
.expect("Failed to run migrations");
186
186
187
-
let state = AppState::new(pool).await;
188
-
let app = bspds::app(state);
189
-
190
187
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
191
188
let addr = listener.local_addr().unwrap();
189
+
190
+
unsafe {
191
+
std::env::set_var("PDS_HOSTNAME", addr.to_string());
192
+
}
193
+
194
+
let state = AppState::new(pool).await;
195
+
let app = bspds::app(state);
192
196
193
197
tokio::spawn(async move {
194
198
axum::serve(listener, app).await.unwrap();
+25
-2
tests/identity.rs
+25
-2
tests/identity.rs
···
106
106
107
107
let handle = format!("webuser_{}", uuid::Uuid::new_v4());
108
108
109
-
let pds_endpoint = "https://localhost";
109
+
let pds_endpoint = base_url().await.replace("http://", "https://");
110
110
111
111
let did_doc = json!({
112
112
"@context": ["https://www.w3.org/ns/did/v1"],
···
211
211
#[tokio::test]
212
212
async fn test_did_web_lifecycle() {
213
213
let client = client();
214
+
215
+
let mock_server = MockServer::start().await;
216
+
let mock_uri = mock_server.uri();
217
+
let mock_addr = mock_uri.trim_start_matches("http://");
218
+
214
219
let handle = format!("lifecycle_{}", uuid::Uuid::new_v4());
215
-
let did = format!("did:web:localhost:u:{}", handle);
220
+
let did = format!("did:web:{}:u:{}", mock_addr.replace(":", "%3A"), handle);
216
221
let email = format!("{}@test.com", handle);
222
+
223
+
let pds_endpoint = base_url().await.replace("http://", "https://");
224
+
225
+
let did_doc = json!({
226
+
"@context": ["https://www.w3.org/ns/did/v1"],
227
+
"id": did,
228
+
"service": [{
229
+
"id": "#atproto_pds",
230
+
"type": "AtprotoPersonalDataServer",
231
+
"serviceEndpoint": pds_endpoint
232
+
}]
233
+
});
234
+
235
+
Mock::given(method("GET"))
236
+
.and(path(format!("/u/{}/did.json", handle)))
237
+
.respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
238
+
.mount(&mock_server)
239
+
.await;
217
240
218
241
let create_payload = json!({
219
242
"handle": handle,
+570
tests/lifecycle.rs
+570
tests/lifecycle.rs
···
1
1
mod common;
2
2
use common::*;
3
3
4
+
use base64::Engine;
4
5
use chrono::Utc;
5
6
use reqwest::{self, StatusCode, header};
6
7
use serde_json::{Value, json};
···
1842
1843
let (new_post_uri, _) = create_post(&client, &did, &jwt, "Post after reactivation").await;
1843
1844
assert!(!new_post_uri.is_empty(), "Should be able to post after reactivation");
1844
1845
}
1846
+
1847
+
#[tokio::test]
1848
+
async fn test_sync_record_lifecycle() {
1849
+
let client = client();
1850
+
let (did, jwt) = setup_new_user("sync-record-lifecycle").await;
1851
+
1852
+
let (post_uri, _post_cid) =
1853
+
create_post(&client, &did, &jwt, "Post for sync record test").await;
1854
+
let post_rkey = post_uri.split('/').last().unwrap();
1855
+
1856
+
let sync_record_res = client
1857
+
.get(format!(
1858
+
"{}/xrpc/com.atproto.sync.getRecord",
1859
+
base_url().await
1860
+
))
1861
+
.query(&[
1862
+
("did", did.as_str()),
1863
+
("collection", "app.bsky.feed.post"),
1864
+
("rkey", post_rkey),
1865
+
])
1866
+
.send()
1867
+
.await
1868
+
.expect("Failed to get sync record");
1869
+
1870
+
assert_eq!(sync_record_res.status(), StatusCode::OK);
1871
+
assert_eq!(
1872
+
sync_record_res
1873
+
.headers()
1874
+
.get("content-type")
1875
+
.and_then(|h| h.to_str().ok()),
1876
+
Some("application/vnd.ipld.car")
1877
+
);
1878
+
let car_bytes = sync_record_res.bytes().await.unwrap();
1879
+
assert!(!car_bytes.is_empty(), "CAR data should not be empty");
1880
+
1881
+
let latest_before = client
1882
+
.get(format!(
1883
+
"{}/xrpc/com.atproto.sync.getLatestCommit",
1884
+
base_url().await
1885
+
))
1886
+
.query(&[("did", did.as_str())])
1887
+
.send()
1888
+
.await
1889
+
.expect("Failed to get latest commit");
1890
+
let latest_before_body: Value = latest_before.json().await.unwrap();
1891
+
let rev_before = latest_before_body["rev"].as_str().unwrap().to_string();
1892
+
1893
+
let (post2_uri, _) = create_post(&client, &did, &jwt, "Second post for sync test").await;
1894
+
1895
+
let latest_after = client
1896
+
.get(format!(
1897
+
"{}/xrpc/com.atproto.sync.getLatestCommit",
1898
+
base_url().await
1899
+
))
1900
+
.query(&[("did", did.as_str())])
1901
+
.send()
1902
+
.await
1903
+
.expect("Failed to get latest commit after");
1904
+
let latest_after_body: Value = latest_after.json().await.unwrap();
1905
+
let rev_after = latest_after_body["rev"].as_str().unwrap().to_string();
1906
+
assert_ne!(rev_before, rev_after, "Revision should change after new record");
1907
+
1908
+
let delete_payload = json!({
1909
+
"repo": did,
1910
+
"collection": "app.bsky.feed.post",
1911
+
"rkey": post_rkey
1912
+
});
1913
+
let delete_res = client
1914
+
.post(format!(
1915
+
"{}/xrpc/com.atproto.repo.deleteRecord",
1916
+
base_url().await
1917
+
))
1918
+
.bearer_auth(&jwt)
1919
+
.json(&delete_payload)
1920
+
.send()
1921
+
.await
1922
+
.expect("Failed to delete record");
1923
+
assert_eq!(delete_res.status(), StatusCode::OK);
1924
+
1925
+
let sync_deleted_res = client
1926
+
.get(format!(
1927
+
"{}/xrpc/com.atproto.sync.getRecord",
1928
+
base_url().await
1929
+
))
1930
+
.query(&[
1931
+
("did", did.as_str()),
1932
+
("collection", "app.bsky.feed.post"),
1933
+
("rkey", post_rkey),
1934
+
])
1935
+
.send()
1936
+
.await
1937
+
.expect("Failed to check deleted record via sync");
1938
+
assert_eq!(
1939
+
sync_deleted_res.status(),
1940
+
StatusCode::NOT_FOUND,
1941
+
"Deleted record should return 404 via sync.getRecord"
1942
+
);
1943
+
1944
+
let post2_rkey = post2_uri.split('/').last().unwrap();
1945
+
let sync_post2_res = client
1946
+
.get(format!(
1947
+
"{}/xrpc/com.atproto.sync.getRecord",
1948
+
base_url().await
1949
+
))
1950
+
.query(&[
1951
+
("did", did.as_str()),
1952
+
("collection", "app.bsky.feed.post"),
1953
+
("rkey", post2_rkey),
1954
+
])
1955
+
.send()
1956
+
.await
1957
+
.expect("Failed to get second post via sync");
1958
+
assert_eq!(
1959
+
sync_post2_res.status(),
1960
+
StatusCode::OK,
1961
+
"Second post should still be accessible"
1962
+
);
1963
+
}
1964
+
1965
+
#[tokio::test]
1966
+
async fn test_sync_repo_export_lifecycle() {
1967
+
let client = client();
1968
+
let (did, jwt) = setup_new_user("sync-repo-export").await;
1969
+
1970
+
let profile_payload = json!({
1971
+
"repo": did,
1972
+
"collection": "app.bsky.actor.profile",
1973
+
"rkey": "self",
1974
+
"record": {
1975
+
"$type": "app.bsky.actor.profile",
1976
+
"displayName": "Sync Export User"
1977
+
}
1978
+
});
1979
+
let profile_res = client
1980
+
.post(format!(
1981
+
"{}/xrpc/com.atproto.repo.putRecord",
1982
+
base_url().await
1983
+
))
1984
+
.bearer_auth(&jwt)
1985
+
.json(&profile_payload)
1986
+
.send()
1987
+
.await
1988
+
.expect("Failed to create profile");
1989
+
assert_eq!(profile_res.status(), StatusCode::OK);
1990
+
1991
+
for i in 0..3 {
1992
+
tokio::time::sleep(Duration::from_millis(50)).await;
1993
+
create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await;
1994
+
}
1995
+
1996
+
let blob_data = b"blob data for sync export test";
1997
+
let upload_res = client
1998
+
.post(format!(
1999
+
"{}/xrpc/com.atproto.repo.uploadBlob",
2000
+
base_url().await
2001
+
))
2002
+
.header(header::CONTENT_TYPE, "application/octet-stream")
2003
+
.bearer_auth(&jwt)
2004
+
.body(blob_data.to_vec())
2005
+
.send()
2006
+
.await
2007
+
.expect("Failed to upload blob");
2008
+
assert_eq!(upload_res.status(), StatusCode::OK);
2009
+
let blob_body: Value = upload_res.json().await.unwrap();
2010
+
let blob_cid = blob_body["blob"]["ref"]["$link"].as_str().unwrap().to_string();
2011
+
2012
+
let repo_status_res = client
2013
+
.get(format!(
2014
+
"{}/xrpc/com.atproto.sync.getRepoStatus",
2015
+
base_url().await
2016
+
))
2017
+
.query(&[("did", did.as_str())])
2018
+
.send()
2019
+
.await
2020
+
.expect("Failed to get repo status");
2021
+
assert_eq!(repo_status_res.status(), StatusCode::OK);
2022
+
let status_body: Value = repo_status_res.json().await.unwrap();
2023
+
assert_eq!(status_body["did"], did);
2024
+
assert_eq!(status_body["active"], true);
2025
+
2026
+
let get_repo_res = client
2027
+
.get(format!(
2028
+
"{}/xrpc/com.atproto.sync.getRepo",
2029
+
base_url().await
2030
+
))
2031
+
.query(&[("did", did.as_str())])
2032
+
.send()
2033
+
.await
2034
+
.expect("Failed to get full repo");
2035
+
assert_eq!(get_repo_res.status(), StatusCode::OK);
2036
+
assert_eq!(
2037
+
get_repo_res
2038
+
.headers()
2039
+
.get("content-type")
2040
+
.and_then(|h| h.to_str().ok()),
2041
+
Some("application/vnd.ipld.car")
2042
+
);
2043
+
let repo_car = get_repo_res.bytes().await.unwrap();
2044
+
assert!(repo_car.len() > 100, "Repo CAR should have substantial data");
2045
+
2046
+
let list_blobs_res = client
2047
+
.get(format!(
2048
+
"{}/xrpc/com.atproto.sync.listBlobs",
2049
+
base_url().await
2050
+
))
2051
+
.query(&[("did", did.as_str())])
2052
+
.send()
2053
+
.await
2054
+
.expect("Failed to list blobs");
2055
+
assert_eq!(list_blobs_res.status(), StatusCode::OK);
2056
+
let blobs_body: Value = list_blobs_res.json().await.unwrap();
2057
+
let cids = blobs_body["cids"].as_array().unwrap();
2058
+
assert!(!cids.is_empty(), "Should have at least one blob");
2059
+
2060
+
let get_blob_res = client
2061
+
.get(format!(
2062
+
"{}/xrpc/com.atproto.sync.getBlob",
2063
+
base_url().await
2064
+
))
2065
+
.query(&[("did", did.as_str()), ("cid", &blob_cid)])
2066
+
.send()
2067
+
.await
2068
+
.expect("Failed to get blob");
2069
+
assert_eq!(get_blob_res.status(), StatusCode::OK);
2070
+
let retrieved_blob = get_blob_res.bytes().await.unwrap();
2071
+
assert_eq!(
2072
+
retrieved_blob.as_ref(),
2073
+
blob_data,
2074
+
"Retrieved blob should match uploaded data"
2075
+
);
2076
+
2077
+
let latest_commit_res = client
2078
+
.get(format!(
2079
+
"{}/xrpc/com.atproto.sync.getLatestCommit",
2080
+
base_url().await
2081
+
))
2082
+
.query(&[("did", did.as_str())])
2083
+
.send()
2084
+
.await
2085
+
.expect("Failed to get latest commit");
2086
+
assert_eq!(latest_commit_res.status(), StatusCode::OK);
2087
+
let commit_body: Value = latest_commit_res.json().await.unwrap();
2088
+
let root_cid = commit_body["cid"].as_str().unwrap();
2089
+
2090
+
let get_blocks_url = format!(
2091
+
"{}/xrpc/com.atproto.sync.getBlocks?did={}&cids={}",
2092
+
base_url().await,
2093
+
did,
2094
+
root_cid
2095
+
);
2096
+
let get_blocks_res = client
2097
+
.get(&get_blocks_url)
2098
+
.send()
2099
+
.await
2100
+
.expect("Failed to get blocks");
2101
+
assert_eq!(get_blocks_res.status(), StatusCode::OK);
2102
+
assert_eq!(
2103
+
get_blocks_res
2104
+
.headers()
2105
+
.get("content-type")
2106
+
.and_then(|h| h.to_str().ok()),
2107
+
Some("application/vnd.ipld.car")
2108
+
);
2109
+
}
2110
+
2111
+
#[tokio::test]
2112
+
async fn test_apply_writes_batch_lifecycle() {
2113
+
let client = client();
2114
+
let (did, jwt) = setup_new_user("apply-writes-batch").await;
2115
+
2116
+
let now = Utc::now().to_rfc3339();
2117
+
let writes_payload = json!({
2118
+
"repo": did,
2119
+
"writes": [
2120
+
{
2121
+
"$type": "com.atproto.repo.applyWrites#create",
2122
+
"collection": "app.bsky.feed.post",
2123
+
"rkey": "batch-post-1",
2124
+
"value": {
2125
+
"$type": "app.bsky.feed.post",
2126
+
"text": "First batch post",
2127
+
"createdAt": now
2128
+
}
2129
+
},
2130
+
{
2131
+
"$type": "com.atproto.repo.applyWrites#create",
2132
+
"collection": "app.bsky.feed.post",
2133
+
"rkey": "batch-post-2",
2134
+
"value": {
2135
+
"$type": "app.bsky.feed.post",
2136
+
"text": "Second batch post",
2137
+
"createdAt": now
2138
+
}
2139
+
},
2140
+
{
2141
+
"$type": "com.atproto.repo.applyWrites#create",
2142
+
"collection": "app.bsky.actor.profile",
2143
+
"rkey": "self",
2144
+
"value": {
2145
+
"$type": "app.bsky.actor.profile",
2146
+
"displayName": "Batch User"
2147
+
}
2148
+
}
2149
+
]
2150
+
});
2151
+
2152
+
let apply_res = client
2153
+
.post(format!(
2154
+
"{}/xrpc/com.atproto.repo.applyWrites",
2155
+
base_url().await
2156
+
))
2157
+
.bearer_auth(&jwt)
2158
+
.json(&writes_payload)
2159
+
.send()
2160
+
.await
2161
+
.expect("Failed to apply writes");
2162
+
2163
+
assert_eq!(apply_res.status(), StatusCode::OK);
2164
+
2165
+
let get_post1 = client
2166
+
.get(format!(
2167
+
"{}/xrpc/com.atproto.repo.getRecord",
2168
+
base_url().await
2169
+
))
2170
+
.query(&[
2171
+
("repo", did.as_str()),
2172
+
("collection", "app.bsky.feed.post"),
2173
+
("rkey", "batch-post-1"),
2174
+
])
2175
+
.send()
2176
+
.await
2177
+
.expect("Failed to get post 1");
2178
+
assert_eq!(get_post1.status(), StatusCode::OK);
2179
+
let post1_body: Value = get_post1.json().await.unwrap();
2180
+
assert_eq!(post1_body["value"]["text"], "First batch post");
2181
+
2182
+
let get_post2 = client
2183
+
.get(format!(
2184
+
"{}/xrpc/com.atproto.repo.getRecord",
2185
+
base_url().await
2186
+
))
2187
+
.query(&[
2188
+
("repo", did.as_str()),
2189
+
("collection", "app.bsky.feed.post"),
2190
+
("rkey", "batch-post-2"),
2191
+
])
2192
+
.send()
2193
+
.await
2194
+
.expect("Failed to get post 2");
2195
+
assert_eq!(get_post2.status(), StatusCode::OK);
2196
+
2197
+
let get_profile = client
2198
+
.get(format!(
2199
+
"{}/xrpc/com.atproto.repo.getRecord",
2200
+
base_url().await
2201
+
))
2202
+
.query(&[
2203
+
("repo", did.as_str()),
2204
+
("collection", "app.bsky.actor.profile"),
2205
+
("rkey", "self"),
2206
+
])
2207
+
.send()
2208
+
.await
2209
+
.expect("Failed to get profile");
2210
+
assert_eq!(get_profile.status(), StatusCode::OK);
2211
+
let profile_body: Value = get_profile.json().await.unwrap();
2212
+
assert_eq!(profile_body["value"]["displayName"], "Batch User");
2213
+
2214
+
let update_writes = json!({
2215
+
"repo": did,
2216
+
"writes": [
2217
+
{
2218
+
"$type": "com.atproto.repo.applyWrites#update",
2219
+
"collection": "app.bsky.actor.profile",
2220
+
"rkey": "self",
2221
+
"value": {
2222
+
"$type": "app.bsky.actor.profile",
2223
+
"displayName": "Updated Batch User"
2224
+
}
2225
+
},
2226
+
{
2227
+
"$type": "com.atproto.repo.applyWrites#delete",
2228
+
"collection": "app.bsky.feed.post",
2229
+
"rkey": "batch-post-1"
2230
+
}
2231
+
]
2232
+
});
2233
+
2234
+
let update_res = client
2235
+
.post(format!(
2236
+
"{}/xrpc/com.atproto.repo.applyWrites",
2237
+
base_url().await
2238
+
))
2239
+
.bearer_auth(&jwt)
2240
+
.json(&update_writes)
2241
+
.send()
2242
+
.await
2243
+
.expect("Failed to apply update writes");
2244
+
assert_eq!(update_res.status(), StatusCode::OK);
2245
+
2246
+
let get_updated_profile = client
2247
+
.get(format!(
2248
+
"{}/xrpc/com.atproto.repo.getRecord",
2249
+
base_url().await
2250
+
))
2251
+
.query(&[
2252
+
("repo", did.as_str()),
2253
+
("collection", "app.bsky.actor.profile"),
2254
+
("rkey", "self"),
2255
+
])
2256
+
.send()
2257
+
.await
2258
+
.expect("Failed to get updated profile");
2259
+
let updated_profile: Value = get_updated_profile.json().await.unwrap();
2260
+
assert_eq!(updated_profile["value"]["displayName"], "Updated Batch User");
2261
+
2262
+
let get_deleted_post = client
2263
+
.get(format!(
2264
+
"{}/xrpc/com.atproto.repo.getRecord",
2265
+
base_url().await
2266
+
))
2267
+
.query(&[
2268
+
("repo", did.as_str()),
2269
+
("collection", "app.bsky.feed.post"),
2270
+
("rkey", "batch-post-1"),
2271
+
])
2272
+
.send()
2273
+
.await
2274
+
.expect("Failed to check deleted post");
2275
+
assert_eq!(
2276
+
get_deleted_post.status(),
2277
+
StatusCode::NOT_FOUND,
2278
+
"Batch-deleted post should be gone"
2279
+
);
2280
+
}
2281
+
2282
+
#[tokio::test]
2283
+
async fn test_resolve_handle_lifecycle() {
2284
+
let client = client();
2285
+
let ts = Utc::now().timestamp_millis();
2286
+
let handle = format!("resolve-test-{}.test", ts);
2287
+
let email = format!("resolve-test-{}@test.com", ts);
2288
+
2289
+
let create_res = client
2290
+
.post(format!(
2291
+
"{}/xrpc/com.atproto.server.createAccount",
2292
+
base_url().await
2293
+
))
2294
+
.json(&json!({
2295
+
"handle": handle,
2296
+
"email": email,
2297
+
"password": "resolve-test-pw"
2298
+
}))
2299
+
.send()
2300
+
.await
2301
+
.expect("Failed to create account");
2302
+
assert_eq!(create_res.status(), StatusCode::OK);
2303
+
let account: Value = create_res.json().await.unwrap();
2304
+
let did = account["did"].as_str().unwrap();
2305
+
2306
+
let resolve_res = client
2307
+
.get(format!(
2308
+
"{}/xrpc/com.atproto.identity.resolveHandle",
2309
+
base_url().await
2310
+
))
2311
+
.query(&[("handle", handle.as_str())])
2312
+
.send()
2313
+
.await
2314
+
.expect("Failed to resolve handle");
2315
+
2316
+
assert_eq!(resolve_res.status(), StatusCode::OK);
2317
+
let resolve_body: Value = resolve_res.json().await.unwrap();
2318
+
assert_eq!(resolve_body["did"], did);
2319
+
}
2320
+
2321
+
#[tokio::test]
2322
+
async fn test_service_auth_lifecycle() {
2323
+
let client = client();
2324
+
let (did, jwt) = setup_new_user("service-auth-test").await;
2325
+
2326
+
let service_auth_res = client
2327
+
.get(format!(
2328
+
"{}/xrpc/com.atproto.server.getServiceAuth",
2329
+
base_url().await
2330
+
))
2331
+
.query(&[
2332
+
("aud", "did:web:api.bsky.app"),
2333
+
("lxm", "com.atproto.repo.uploadBlob"),
2334
+
])
2335
+
.bearer_auth(&jwt)
2336
+
.send()
2337
+
.await
2338
+
.expect("Failed to get service auth");
2339
+
2340
+
assert_eq!(service_auth_res.status(), StatusCode::OK);
2341
+
let auth_body: Value = service_auth_res.json().await.unwrap();
2342
+
let service_token = auth_body["token"].as_str().expect("No token in response");
2343
+
2344
+
let parts: Vec<&str> = service_token.split('.').collect();
2345
+
assert_eq!(parts.len(), 3, "Service token should be a valid JWT");
2346
+
2347
+
let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
2348
+
.decode(parts[1])
2349
+
.expect("Failed to decode JWT payload");
2350
+
let claims: Value = serde_json::from_slice(&payload_bytes).expect("Invalid JWT payload");
2351
+
2352
+
assert_eq!(claims["iss"], did);
2353
+
assert_eq!(claims["aud"], "did:web:api.bsky.app");
2354
+
assert_eq!(claims["lxm"], "com.atproto.repo.uploadBlob");
2355
+
}
2356
+
2357
+
#[tokio::test]
2358
+
async fn test_moderation_report_lifecycle() {
2359
+
let client = client();
2360
+
let (alice_did, alice_jwt) = setup_new_user("alice-report").await;
2361
+
let (bob_did, bob_jwt) = setup_new_user("bob-report").await;
2362
+
2363
+
let (post_uri, post_cid) =
2364
+
create_post(&client, &bob_did, &bob_jwt, "This is a reportable post").await;
2365
+
2366
+
let report_payload = json!({
2367
+
"reasonType": "com.atproto.moderation.defs#reasonSpam",
2368
+
"reason": "This looks like spam to me",
2369
+
"subject": {
2370
+
"$type": "com.atproto.repo.strongRef",
2371
+
"uri": post_uri,
2372
+
"cid": post_cid
2373
+
}
2374
+
});
2375
+
2376
+
let report_res = client
2377
+
.post(format!(
2378
+
"{}/xrpc/com.atproto.moderation.createReport",
2379
+
base_url().await
2380
+
))
2381
+
.bearer_auth(&alice_jwt)
2382
+
.json(&report_payload)
2383
+
.send()
2384
+
.await
2385
+
.expect("Failed to create report");
2386
+
2387
+
assert_eq!(report_res.status(), StatusCode::OK);
2388
+
let report_body: Value = report_res.json().await.unwrap();
2389
+
assert!(report_body["id"].is_number(), "Report should have an ID");
2390
+
assert_eq!(report_body["reasonType"], "com.atproto.moderation.defs#reasonSpam");
2391
+
assert_eq!(report_body["reportedBy"], alice_did);
2392
+
2393
+
let account_report_payload = json!({
2394
+
"reasonType": "com.atproto.moderation.defs#reasonOther",
2395
+
"reason": "Suspicious account activity",
2396
+
"subject": {
2397
+
"$type": "com.atproto.admin.defs#repoRef",
2398
+
"did": bob_did
2399
+
}
2400
+
});
2401
+
2402
+
let account_report_res = client
2403
+
.post(format!(
2404
+
"{}/xrpc/com.atproto.moderation.createReport",
2405
+
base_url().await
2406
+
))
2407
+
.bearer_auth(&alice_jwt)
2408
+
.json(&account_report_payload)
2409
+
.send()
2410
+
.await
2411
+
.expect("Failed to create account report");
2412
+
2413
+
assert_eq!(account_report_res.status(), StatusCode::OK);
2414
+
}
+66
tests/proxy.rs
+66
tests/proxy.rs
···
98
98
assert_eq!(claims["aud"], upstream_url);
99
99
assert_eq!(claims["lxm"], "com.example.signed");
100
100
}
101
+
102
+
#[tokio::test]
103
+
async fn test_proxy_post_with_body() {
104
+
let app_url = common::base_url().await;
105
+
let (upstream_url, mut rx) = spawn_mock_upstream().await;
106
+
let client = Client::new();
107
+
108
+
let payload = serde_json::json!({
109
+
"text": "Hello from proxy test",
110
+
"createdAt": "2024-01-01T00:00:00Z"
111
+
});
112
+
113
+
let res = client
114
+
.post(format!("{}/xrpc/com.example.postMethod", app_url))
115
+
.header("atproto-proxy", &upstream_url)
116
+
.header("Authorization", "Bearer test-token")
117
+
.json(&payload)
118
+
.send()
119
+
.await
120
+
.unwrap();
121
+
122
+
assert_eq!(res.status(), StatusCode::OK);
123
+
124
+
let (method, uri, auth) = rx.recv().await.expect("Upstream should receive request");
125
+
assert_eq!(method, "POST");
126
+
assert_eq!(uri, "/xrpc/com.example.postMethod");
127
+
assert_eq!(auth, Some("Bearer test-token".to_string()));
128
+
}
129
+
130
+
#[tokio::test]
131
+
async fn test_proxy_with_query_params() {
132
+
let app_url = common::base_url().await;
133
+
let (upstream_url, mut rx) = spawn_mock_upstream().await;
134
+
let client = Client::new();
135
+
136
+
let res = client
137
+
.get(format!(
138
+
"{}/xrpc/com.example.query?repo=did:plc:test&collection=app.bsky.feed.post&limit=50",
139
+
app_url
140
+
))
141
+
.header("atproto-proxy", &upstream_url)
142
+
.header("Authorization", "Bearer test-token")
143
+
.send()
144
+
.await
145
+
.unwrap();
146
+
147
+
assert_eq!(res.status(), StatusCode::OK);
148
+
149
+
let (method, uri, _auth) = rx.recv().await.expect("Upstream should receive request");
150
+
assert_eq!(method, "GET");
151
+
assert!(
152
+
uri.contains("repo=did") || uri.contains("repo=did%3Aplc%3Atest"),
153
+
"URI should contain repo param, got: {}",
154
+
uri
155
+
);
156
+
assert!(
157
+
uri.contains("collection=app.bsky.feed.post") || uri.contains("collection=app.bsky"),
158
+
"URI should contain collection param, got: {}",
159
+
uri
160
+
);
161
+
assert!(
162
+
uri.contains("limit=50"),
163
+
"URI should contain limit param, got: {}",
164
+
uri
165
+
);
166
+
}
+203
tests/sync.rs
+203
tests/sync.rs
···
3
3
use reqwest::StatusCode;
4
4
use reqwest::header;
5
5
use serde_json::Value;
6
+
use chrono;
6
7
7
8
#[tokio::test]
8
9
async fn test_get_latest_commit_success() {
···
352
353
353
354
assert_eq!(res.status(), StatusCode::OK);
354
355
}
356
+
357
+
#[tokio::test]
358
+
async fn test_get_repo_success() {
359
+
let client = client();
360
+
let (access_jwt, did) = create_account_and_login(&client).await;
361
+
362
+
let post_payload = serde_json::json!({
363
+
"repo": did,
364
+
"collection": "app.bsky.feed.post",
365
+
"record": {
366
+
"$type": "app.bsky.feed.post",
367
+
"text": "Test post for getRepo",
368
+
"createdAt": chrono::Utc::now().to_rfc3339()
369
+
}
370
+
});
371
+
let _ = client
372
+
.post(format!(
373
+
"{}/xrpc/com.atproto.repo.createRecord",
374
+
base_url().await
375
+
))
376
+
.bearer_auth(&access_jwt)
377
+
.json(&post_payload)
378
+
.send()
379
+
.await
380
+
.expect("Failed to create record");
381
+
382
+
let params = [("did", did.as_str())];
383
+
let res = client
384
+
.get(format!(
385
+
"{}/xrpc/com.atproto.sync.getRepo",
386
+
base_url().await
387
+
))
388
+
.query(¶ms)
389
+
.send()
390
+
.await
391
+
.expect("Failed to send request");
392
+
393
+
assert_eq!(res.status(), StatusCode::OK);
394
+
assert_eq!(
395
+
res.headers()
396
+
.get("content-type")
397
+
.and_then(|h| h.to_str().ok()),
398
+
Some("application/vnd.ipld.car")
399
+
);
400
+
let body = res.bytes().await.expect("Failed to get body");
401
+
assert!(!body.is_empty());
402
+
}
403
+
404
+
#[tokio::test]
405
+
async fn test_get_repo_not_found() {
406
+
let client = client();
407
+
let params = [("did", "did:plc:nonexistent12345")];
408
+
let res = client
409
+
.get(format!(
410
+
"{}/xrpc/com.atproto.sync.getRepo",
411
+
base_url().await
412
+
))
413
+
.query(¶ms)
414
+
.send()
415
+
.await
416
+
.expect("Failed to send request");
417
+
418
+
assert_eq!(res.status(), StatusCode::NOT_FOUND);
419
+
let body: Value = res.json().await.expect("Response was not valid JSON");
420
+
assert_eq!(body["error"], "RepoNotFound");
421
+
}
422
+
423
+
#[tokio::test]
424
+
async fn test_get_record_sync_success() {
425
+
let client = client();
426
+
let (access_jwt, did) = create_account_and_login(&client).await;
427
+
428
+
let post_payload = serde_json::json!({
429
+
"repo": did,
430
+
"collection": "app.bsky.feed.post",
431
+
"record": {
432
+
"$type": "app.bsky.feed.post",
433
+
"text": "Test post for sync getRecord",
434
+
"createdAt": chrono::Utc::now().to_rfc3339()
435
+
}
436
+
});
437
+
let create_res = client
438
+
.post(format!(
439
+
"{}/xrpc/com.atproto.repo.createRecord",
440
+
base_url().await
441
+
))
442
+
.bearer_auth(&access_jwt)
443
+
.json(&post_payload)
444
+
.send()
445
+
.await
446
+
.expect("Failed to create record");
447
+
448
+
let create_body: Value = create_res.json().await.expect("Invalid JSON");
449
+
let uri = create_body["uri"].as_str().expect("No URI");
450
+
let rkey = uri.split('/').last().expect("Invalid URI");
451
+
452
+
let params = [
453
+
("did", did.as_str()),
454
+
("collection", "app.bsky.feed.post"),
455
+
("rkey", rkey),
456
+
];
457
+
let res = client
458
+
.get(format!(
459
+
"{}/xrpc/com.atproto.sync.getRecord",
460
+
base_url().await
461
+
))
462
+
.query(¶ms)
463
+
.send()
464
+
.await
465
+
.expect("Failed to send request");
466
+
467
+
assert_eq!(res.status(), StatusCode::OK);
468
+
assert_eq!(
469
+
res.headers()
470
+
.get("content-type")
471
+
.and_then(|h| h.to_str().ok()),
472
+
Some("application/vnd.ipld.car")
473
+
);
474
+
let body = res.bytes().await.expect("Failed to get body");
475
+
assert!(!body.is_empty());
476
+
}
477
+
478
+
#[tokio::test]
479
+
async fn test_get_record_sync_not_found() {
480
+
let client = client();
481
+
let (_, did) = create_account_and_login(&client).await;
482
+
483
+
let params = [
484
+
("did", did.as_str()),
485
+
("collection", "app.bsky.feed.post"),
486
+
("rkey", "nonexistent12345"),
487
+
];
488
+
let res = client
489
+
.get(format!(
490
+
"{}/xrpc/com.atproto.sync.getRecord",
491
+
base_url().await
492
+
))
493
+
.query(¶ms)
494
+
.send()
495
+
.await
496
+
.expect("Failed to send request");
497
+
498
+
assert_eq!(res.status(), StatusCode::NOT_FOUND);
499
+
let body: Value = res.json().await.expect("Response was not valid JSON");
500
+
assert_eq!(body["error"], "RecordNotFound");
501
+
}
502
+
503
+
#[tokio::test]
504
+
async fn test_get_blocks_success() {
505
+
let client = client();
506
+
let (_, did) = create_account_and_login(&client).await;
507
+
508
+
let params = [("did", did.as_str())];
509
+
let latest_res = client
510
+
.get(format!(
511
+
"{}/xrpc/com.atproto.sync.getLatestCommit",
512
+
base_url().await
513
+
))
514
+
.query(¶ms)
515
+
.send()
516
+
.await
517
+
.expect("Failed to get latest commit");
518
+
519
+
let latest_body: Value = latest_res.json().await.expect("Invalid JSON");
520
+
let root_cid = latest_body["cid"].as_str().expect("No CID");
521
+
522
+
let url = format!(
523
+
"{}/xrpc/com.atproto.sync.getBlocks?did={}&cids={}",
524
+
base_url().await,
525
+
did,
526
+
root_cid
527
+
);
528
+
let res = client
529
+
.get(&url)
530
+
.send()
531
+
.await
532
+
.expect("Failed to send request");
533
+
534
+
assert_eq!(res.status(), StatusCode::OK);
535
+
assert_eq!(
536
+
res.headers()
537
+
.get("content-type")
538
+
.and_then(|h| h.to_str().ok()),
539
+
Some("application/vnd.ipld.car")
540
+
);
541
+
}
542
+
543
+
#[tokio::test]
544
+
async fn test_get_blocks_not_found() {
545
+
let client = client();
546
+
let url = format!(
547
+
"{}/xrpc/com.atproto.sync.getBlocks?did=did:plc:nonexistent12345&cids=bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku",
548
+
base_url().await
549
+
);
550
+
let res = client
551
+
.get(&url)
552
+
.send()
553
+
.await
554
+
.expect("Failed to send request");
555
+
556
+
assert_eq!(res.status(), StatusCode::NOT_FOUND);
557
+
}