A community based topic aggregation platform built on atproto

test(communities): add E2E tests and update service mocks

E2E Tests (3 new test cases):
- Block via XRPC endpoint: Full flow from HTTP → PDS → Jetstream → AppView
- Unblock via XRPC endpoint: Complete unblock flow with DELETE event
- Block fails without authentication: Validates auth requirement (401)

Each E2E test verifies:
✓ XRPC endpoint responds correctly
✓ Record created/deleted on PDS
✓ Jetstream consumer indexes event
✓ AppView database state updated

Unit Test Updates:
- Added 6 mock methods to mockCommunityRepo for blocking operations
- Ensures service layer tests compile and pass

All tests follow existing E2E patterns (subscribe/unsubscribe) for
consistency.

+346
+322
tests/integration/community_e2e_test.go
··· 933 933 t.Logf(" ✓ Subscriber count decremented") 934 934 }) 935 935 936 + t.Run("Block via XRPC endpoint", func(t *testing.T) { 937 + // Create a community to block 938 + community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 939 + 940 + t.Logf("🚫 Blocking community via XRPC endpoint...") 941 + blockReq := map[string]interface{}{ 942 + "community": community.DID, 943 + } 944 + 945 + blockJSON, err := json.Marshal(blockReq) 946 + if err != nil { 947 + t.Fatalf("Failed to marshal block request: %v", err) 948 + } 949 + 950 + req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 951 + if err != nil { 952 + t.Fatalf("Failed to create block request: %v", err) 953 + } 954 + req.Header.Set("Content-Type", "application/json") 955 + req.Header.Set("Authorization", "Bearer "+accessToken) 956 + 957 + resp, err := http.DefaultClient.Do(req) 958 + if err != nil { 959 + t.Fatalf("Failed to POST block: %v", err) 960 + } 961 + defer func() { _ = resp.Body.Close() }() 962 + 963 + if resp.StatusCode != http.StatusOK { 964 + body, readErr := io.ReadAll(resp.Body) 965 + if readErr != nil { 966 + t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 967 + } 968 + t.Logf("❌ XRPC Block Failed") 969 + t.Logf(" Status: %d", resp.StatusCode) 970 + t.Logf(" Response: %s", string(body)) 971 + t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 972 + } 973 + 974 + var blockResp struct { 975 + Block struct { 976 + RecordURI string `json:"recordUri"` 977 + RecordCID string `json:"recordCid"` 978 + } `json:"block"` 979 + } 980 + 981 + if err := json.NewDecoder(resp.Body).Decode(&blockResp); err != nil { 982 + t.Fatalf("Failed to decode block response: %v", err) 983 + } 984 + 985 + t.Logf("✅ XRPC block response received:") 986 + t.Logf(" RecordURI: %s", blockResp.Block.RecordURI) 987 + t.Logf(" RecordCID: %s", blockResp.Block.RecordCID) 988 + 989 + // Extract rkey from URI for verification 990 + rkey := "" 991 + if uriParts := strings.Split(blockResp.Block.RecordURI, "/"); len(uriParts) >= 4 { 992 + rkey = uriParts[len(uriParts)-1] 993 + } 994 + 995 + // Verify the block record exists on PDS 996 + t.Logf("🔍 Verifying block record exists on PDS...") 997 + collection := "social.coves.community.block" 998 + pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 999 + pdsURL, instanceDID, collection, rkey)) 1000 + if pdsErr != nil { 1001 + t.Fatalf("Failed to query PDS: %v", pdsErr) 1002 + } 1003 + defer func() { 1004 + if closeErr := pdsResp.Body.Close(); closeErr != nil { 1005 + t.Logf("Failed to close PDS response: %v", closeErr) 1006 + } 1007 + }() 1008 + 1009 + if pdsResp.StatusCode != http.StatusOK { 1010 + body, readErr := io.ReadAll(pdsResp.Body) 1011 + if readErr != nil { 1012 + t.Fatalf("Block record not found on PDS (status: %d, failed to read body: %v)", pdsResp.StatusCode, readErr) 1013 + } 1014 + t.Fatalf("Block record not found on PDS (status: %d): %s", pdsResp.StatusCode, string(body)) 1015 + } 1016 + t.Logf("✅ Block record exists on PDS") 1017 + 1018 + // CRITICAL: Simulate Jetstream consumer indexing the block 1019 + t.Logf("🔄 Simulating Jetstream consumer indexing block event...") 1020 + blockEvent := jetstream.JetstreamEvent{ 1021 + Did: instanceDID, 1022 + TimeUS: time.Now().UnixMicro(), 1023 + Kind: "commit", 1024 + Commit: &jetstream.CommitEvent{ 1025 + Rev: "test-block-rev", 1026 + Operation: "create", 1027 + Collection: "social.coves.community.block", 1028 + RKey: rkey, 1029 + CID: blockResp.Block.RecordCID, 1030 + Record: map[string]interface{}{ 1031 + "subject": community.DID, 1032 + "createdAt": time.Now().Format(time.RFC3339), 1033 + }, 1034 + }, 1035 + } 1036 + if handleErr := consumer.HandleEvent(context.Background(), &blockEvent); handleErr != nil { 1037 + t.Fatalf("Failed to handle block event: %v", handleErr) 1038 + } 1039 + 1040 + // Verify block was indexed in AppView 1041 + t.Logf("🔍 Verifying block indexed in AppView...") 1042 + block, err := communityRepo.GetBlock(ctx, instanceDID, community.DID) 1043 + if err != nil { 1044 + t.Fatalf("Failed to get block from AppView: %v", err) 1045 + } 1046 + if block.RecordURI != blockResp.Block.RecordURI { 1047 + t.Errorf("RecordURI mismatch: expected %s, got %s", blockResp.Block.RecordURI, block.RecordURI) 1048 + } 1049 + 1050 + t.Logf("✅ TRUE E2E BLOCK FLOW COMPLETE:") 1051 + t.Logf(" Client → XRPC Block → PDS Create → Firehose → Consumer → AppView ✓") 1052 + t.Logf(" ✓ Block record created on PDS") 1053 + t.Logf(" ✓ Block indexed in AppView") 1054 + }) 1055 + 1056 + t.Run("Unblock via XRPC endpoint", func(t *testing.T) { 1057 + // Create a community and block it first 1058 + community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 1059 + 1060 + // Block the community 1061 + t.Logf("🚫 Blocking community first...") 1062 + blockReq := map[string]interface{}{ 1063 + "community": community.DID, 1064 + } 1065 + blockJSON, err := json.Marshal(blockReq) 1066 + if err != nil { 1067 + t.Fatalf("Failed to marshal block request: %v", err) 1068 + } 1069 + 1070 + blockHttpReq, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 1071 + if err != nil { 1072 + t.Fatalf("Failed to create block request: %v", err) 1073 + } 1074 + blockHttpReq.Header.Set("Content-Type", "application/json") 1075 + blockHttpReq.Header.Set("Authorization", "Bearer "+accessToken) 1076 + 1077 + blockResp, err := http.DefaultClient.Do(blockHttpReq) 1078 + if err != nil { 1079 + t.Fatalf("Failed to POST block: %v", err) 1080 + } 1081 + 1082 + var blockRespData struct { 1083 + Block struct { 1084 + RecordURI string `json:"recordUri"` 1085 + } `json:"block"` 1086 + } 1087 + if err := json.NewDecoder(blockResp.Body).Decode(&blockRespData); err != nil { 1088 + func() { _ = blockResp.Body.Close() }() 1089 + t.Fatalf("Failed to decode block response: %v", err) 1090 + } 1091 + func() { _ = blockResp.Body.Close() }() 1092 + 1093 + rkey := "" 1094 + if uriParts := strings.Split(blockRespData.Block.RecordURI, "/"); len(uriParts) >= 4 { 1095 + rkey = uriParts[len(uriParts)-1] 1096 + } 1097 + 1098 + // Index the block via consumer 1099 + blockEvent := jetstream.JetstreamEvent{ 1100 + Did: instanceDID, 1101 + TimeUS: time.Now().UnixMicro(), 1102 + Kind: "commit", 1103 + Commit: &jetstream.CommitEvent{ 1104 + Rev: "test-block-rev", 1105 + Operation: "create", 1106 + Collection: "social.coves.community.block", 1107 + RKey: rkey, 1108 + CID: "test-block-cid", 1109 + Record: map[string]interface{}{ 1110 + "subject": community.DID, 1111 + "createdAt": time.Now().Format(time.RFC3339), 1112 + }, 1113 + }, 1114 + } 1115 + if handleErr := consumer.HandleEvent(context.Background(), &blockEvent); handleErr != nil { 1116 + t.Fatalf("Failed to handle block event: %v", handleErr) 1117 + } 1118 + 1119 + // Now unblock the community 1120 + t.Logf("✅ Unblocking community via XRPC endpoint...") 1121 + unblockReq := map[string]interface{}{ 1122 + "community": community.DID, 1123 + } 1124 + 1125 + unblockJSON, err := json.Marshal(unblockReq) 1126 + if err != nil { 1127 + t.Fatalf("Failed to marshal unblock request: %v", err) 1128 + } 1129 + 1130 + req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.unblockCommunity", bytes.NewBuffer(unblockJSON)) 1131 + if err != nil { 1132 + t.Fatalf("Failed to create unblock request: %v", err) 1133 + } 1134 + req.Header.Set("Content-Type", "application/json") 1135 + req.Header.Set("Authorization", "Bearer "+accessToken) 1136 + 1137 + resp, err := http.DefaultClient.Do(req) 1138 + if err != nil { 1139 + t.Fatalf("Failed to POST unblock: %v", err) 1140 + } 1141 + defer func() { _ = resp.Body.Close() }() 1142 + 1143 + if resp.StatusCode != http.StatusOK { 1144 + body, readErr := io.ReadAll(resp.Body) 1145 + if readErr != nil { 1146 + t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 1147 + } 1148 + t.Logf("❌ XRPC Unblock Failed") 1149 + t.Logf(" Status: %d", resp.StatusCode) 1150 + t.Logf(" Response: %s", string(body)) 1151 + t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 1152 + } 1153 + 1154 + var unblockResp struct { 1155 + Success bool `json:"success"` 1156 + } 1157 + 1158 + if err := json.NewDecoder(resp.Body).Decode(&unblockResp); err != nil { 1159 + t.Fatalf("Failed to decode unblock response: %v", err) 1160 + } 1161 + 1162 + if !unblockResp.Success { 1163 + t.Errorf("Expected success: true, got: %v", unblockResp.Success) 1164 + } 1165 + 1166 + // Verify the block record was deleted from PDS 1167 + t.Logf("🔍 Verifying block record deleted from PDS...") 1168 + collection := "social.coves.community.block" 1169 + pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 1170 + pdsURL, instanceDID, collection, rkey)) 1171 + if pdsErr != nil { 1172 + t.Fatalf("Failed to query PDS: %v", pdsErr) 1173 + } 1174 + defer func() { 1175 + if closeErr := pdsResp.Body.Close(); closeErr != nil { 1176 + t.Logf("Failed to close PDS response: %v", closeErr) 1177 + } 1178 + }() 1179 + 1180 + if pdsResp.StatusCode == http.StatusOK { 1181 + t.Errorf("❌ Block record still exists on PDS (expected 404, got 200)") 1182 + } else { 1183 + t.Logf("✅ Block record successfully deleted from PDS (status: %d)", pdsResp.StatusCode) 1184 + } 1185 + 1186 + // CRITICAL: Simulate Jetstream consumer indexing the DELETE event 1187 + t.Logf("🔄 Simulating Jetstream consumer indexing DELETE event...") 1188 + deleteEvent := jetstream.JetstreamEvent{ 1189 + Did: instanceDID, 1190 + TimeUS: time.Now().UnixMicro(), 1191 + Kind: "commit", 1192 + Commit: &jetstream.CommitEvent{ 1193 + Rev: "test-unblock-rev", 1194 + Operation: "delete", 1195 + Collection: "social.coves.community.block", 1196 + RKey: rkey, 1197 + CID: "", 1198 + Record: nil, 1199 + }, 1200 + } 1201 + if handleErr := consumer.HandleEvent(context.Background(), &deleteEvent); handleErr != nil { 1202 + t.Fatalf("Failed to handle delete event: %v", handleErr) 1203 + } 1204 + 1205 + // Verify block was removed from AppView 1206 + t.Logf("🔍 Verifying block removed from AppView...") 1207 + _, err = communityRepo.GetBlock(ctx, instanceDID, community.DID) 1208 + if err == nil { 1209 + t.Errorf("❌ Block still exists in AppView (should be deleted)") 1210 + } else if !communities.IsNotFound(err) { 1211 + t.Fatalf("Unexpected error querying block: %v", err) 1212 + } else { 1213 + t.Logf("✅ Block removed from AppView") 1214 + } 1215 + 1216 + t.Logf("✅ TRUE E2E UNBLOCK FLOW COMPLETE:") 1217 + t.Logf(" Client → XRPC Unblock → PDS Delete → Firehose → Consumer → AppView ✓") 1218 + t.Logf(" ✓ Block deleted from PDS") 1219 + t.Logf(" ✓ Block removed from AppView") 1220 + }) 1221 + 1222 + t.Run("Block fails without authentication", func(t *testing.T) { 1223 + // Create a community to attempt blocking 1224 + community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 1225 + 1226 + t.Logf("🔒 Attempting to block community without auth token...") 1227 + blockReq := map[string]interface{}{ 1228 + "community": community.DID, 1229 + } 1230 + 1231 + blockJSON, err := json.Marshal(blockReq) 1232 + if err != nil { 1233 + t.Fatalf("Failed to marshal block request: %v", err) 1234 + } 1235 + 1236 + req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 1237 + if err != nil { 1238 + t.Fatalf("Failed to create block request: %v", err) 1239 + } 1240 + req.Header.Set("Content-Type", "application/json") 1241 + // NO Authorization header 1242 + 1243 + resp, err := http.DefaultClient.Do(req) 1244 + if err != nil { 1245 + t.Fatalf("Failed to POST block: %v", err) 1246 + } 1247 + defer func() { _ = resp.Body.Close() }() 1248 + 1249 + // Should fail with 401 Unauthorized 1250 + if resp.StatusCode != http.StatusUnauthorized { 1251 + body, _ := io.ReadAll(resp.Body) 1252 + t.Errorf("Expected 401 Unauthorized, got %d: %s", resp.StatusCode, string(body)) 1253 + } else { 1254 + t.Logf("✅ Block correctly rejected without authentication (401)") 1255 + } 1256 + }) 1257 + 936 1258 t.Run("Update via XRPC endpoint", func(t *testing.T) { 937 1259 // Create a community first (via service, so it's indexed) 938 1260 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL)
+24
tests/unit/community_service_test.go
··· 102 102 return nil, nil 103 103 } 104 104 105 + func (m *mockCommunityRepo) BlockCommunity(ctx context.Context, block *communities.CommunityBlock) (*communities.CommunityBlock, error) { 106 + return block, nil 107 + } 108 + 109 + func (m *mockCommunityRepo) UnblockCommunity(ctx context.Context, userDID, communityDID string) error { 110 + return nil 111 + } 112 + 113 + func (m *mockCommunityRepo) GetBlock(ctx context.Context, userDID, communityDID string) (*communities.CommunityBlock, error) { 114 + return nil, communities.ErrBlockNotFound 115 + } 116 + 117 + func (m *mockCommunityRepo) GetBlockByURI(ctx context.Context, recordURI string) (*communities.CommunityBlock, error) { 118 + return nil, communities.ErrBlockNotFound 119 + } 120 + 121 + func (m *mockCommunityRepo) ListBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*communities.CommunityBlock, error) { 122 + return nil, nil 123 + } 124 + 125 + func (m *mockCommunityRepo) IsBlocked(ctx context.Context, userDID, communityDID string) (bool, error) { 126 + return false, nil 127 + } 128 + 105 129 func (m *mockCommunityRepo) CreateMembership(ctx context.Context, membership *communities.Membership) (*communities.Membership, error) { 106 130 return membership, nil 107 131 }