A community based topic aggregation platform built on atproto

Merge branch 'feat/community-blocking'

+1593 -85
+172
internal/api/handlers/community/block.go
··· 1 + package community 2 + 3 + import ( 4 + "Coves/internal/api/middleware" 5 + "Coves/internal/core/communities" 6 + "encoding/json" 7 + "log" 8 + "net/http" 9 + "regexp" 10 + "strings" 11 + ) 12 + 13 + // Package-level compiled regex for DID validation (compiled once at startup) 14 + var ( 15 + didRegex = regexp.MustCompile(`^did:(plc|web):[a-zA-Z0-9._:%-]+$`) 16 + ) 17 + 18 + // BlockHandler handles community blocking operations 19 + type BlockHandler struct { 20 + service communities.Service 21 + } 22 + 23 + // NewBlockHandler creates a new block handler 24 + func NewBlockHandler(service communities.Service) *BlockHandler { 25 + return &BlockHandler{ 26 + service: service, 27 + } 28 + } 29 + 30 + // HandleBlock blocks a community 31 + // POST /xrpc/social.coves.community.blockCommunity 32 + // 33 + // Request body: { "community": "did:plc:xxx" } 34 + // Note: Per lexicon spec, only DIDs are accepted (not handles). 35 + // The block record's "subject" field requires format: "did". 36 + func (h *BlockHandler) HandleBlock(w http.ResponseWriter, r *http.Request) { 37 + if r.Method != http.MethodPost { 38 + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) 39 + return 40 + } 41 + 42 + // Parse request body 43 + var req struct { 44 + Community string `json:"community"` // DID only (per lexicon) 45 + } 46 + 47 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 48 + writeError(w, http.StatusBadRequest, "InvalidRequest", "Invalid request body") 49 + return 50 + } 51 + 52 + if req.Community == "" { 53 + writeError(w, http.StatusBadRequest, "InvalidRequest", "community is required") 54 + return 55 + } 56 + 57 + // Validate DID format (per lexicon: format must be "did") 58 + if !strings.HasPrefix(req.Community, "did:") { 59 + writeError(w, http.StatusBadRequest, "InvalidRequest", 60 + "community must be a DID (did:plc:... or did:web:...)") 61 + return 62 + } 63 + 64 + // Validate DID format with regex: did:method:identifier 65 + if !didRegex.MatchString(req.Community) { 66 + writeError(w, http.StatusBadRequest, "InvalidRequest", "invalid DID format") 67 + return 68 + } 69 + 70 + // Extract authenticated user DID and access token from request context (injected by auth middleware) 71 + userDID := middleware.GetUserDID(r) 72 + if userDID == "" { 73 + writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required") 74 + return 75 + } 76 + 77 + userAccessToken := middleware.GetUserAccessToken(r) 78 + if userAccessToken == "" { 79 + writeError(w, http.StatusUnauthorized, "AuthRequired", "Missing access token") 80 + return 81 + } 82 + 83 + // Block via service (write-forward to PDS) 84 + block, err := h.service.BlockCommunity(r.Context(), userDID, userAccessToken, req.Community) 85 + if err != nil { 86 + handleServiceError(w, err) 87 + return 88 + } 89 + 90 + // Return success response (following atProto conventions for block responses) 91 + response := map[string]interface{}{ 92 + "block": map[string]interface{}{ 93 + "recordUri": block.RecordURI, 94 + "recordCid": block.RecordCID, 95 + }, 96 + } 97 + 98 + w.Header().Set("Content-Type", "application/json") 99 + w.WriteHeader(http.StatusOK) 100 + if err := json.NewEncoder(w).Encode(response); err != nil { 101 + log.Printf("Failed to encode response: %v", err) 102 + } 103 + } 104 + 105 + // HandleUnblock unblocks a community 106 + // POST /xrpc/social.coves.community.unblockCommunity 107 + // 108 + // Request body: { "community": "did:plc:xxx" } 109 + // Note: Per lexicon spec, only DIDs are accepted (not handles). 110 + func (h *BlockHandler) HandleUnblock(w http.ResponseWriter, r *http.Request) { 111 + if r.Method != http.MethodPost { 112 + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) 113 + return 114 + } 115 + 116 + // Parse request body 117 + var req struct { 118 + Community string `json:"community"` // DID only (per lexicon) 119 + } 120 + 121 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 122 + writeError(w, http.StatusBadRequest, "InvalidRequest", "Invalid request body") 123 + return 124 + } 125 + 126 + if req.Community == "" { 127 + writeError(w, http.StatusBadRequest, "InvalidRequest", "community is required") 128 + return 129 + } 130 + 131 + // Validate DID format (per lexicon: format must be "did") 132 + if !strings.HasPrefix(req.Community, "did:") { 133 + writeError(w, http.StatusBadRequest, "InvalidRequest", 134 + "community must be a DID (did:plc:... or did:web:...)") 135 + return 136 + } 137 + 138 + // Validate DID format with regex: did:method:identifier 139 + if !didRegex.MatchString(req.Community) { 140 + writeError(w, http.StatusBadRequest, "InvalidRequest", "invalid DID format") 141 + return 142 + } 143 + 144 + // Extract authenticated user DID and access token from request context (injected by auth middleware) 145 + userDID := middleware.GetUserDID(r) 146 + if userDID == "" { 147 + writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required") 148 + return 149 + } 150 + 151 + userAccessToken := middleware.GetUserAccessToken(r) 152 + if userAccessToken == "" { 153 + writeError(w, http.StatusUnauthorized, "AuthRequired", "Missing access token") 154 + return 155 + } 156 + 157 + // Unblock via service (delete record on PDS) 158 + err := h.service.UnblockCommunity(r.Context(), userDID, userAccessToken, req.Community) 159 + if err != nil { 160 + handleServiceError(w, err) 161 + return 162 + } 163 + 164 + // Return success response 165 + w.Header().Set("Content-Type", "application/json") 166 + w.WriteHeader(http.StatusOK) 167 + if err := json.NewEncoder(w).Encode(map[string]interface{}{ 168 + "success": true, 169 + }); err != nil { 170 + log.Printf("Failed to encode response: %v", err) 171 + } 172 + }
+23 -4
internal/api/handlers/community/subscribe.go
··· 6 6 "encoding/json" 7 7 "log" 8 8 "net/http" 9 + "strings" 9 10 ) 10 11 11 12 // SubscribeHandler handles community subscriptions ··· 22 23 23 24 // HandleSubscribe subscribes a user to a community 24 25 // POST /xrpc/social.coves.community.subscribe 25 - // Body: { "community": "did:plc:xxx" or "!gaming@coves.social" } 26 + // 27 + // Request body: { "community": "did:plc:xxx", "contentVisibility": 3 } 28 + // Note: Per lexicon spec, only DIDs are accepted for the "subject" field (not handles). 26 29 func (h *SubscribeHandler) HandleSubscribe(w http.ResponseWriter, r *http.Request) { 27 30 if r.Method != http.MethodPost { 28 31 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) ··· 31 34 32 35 // Parse request body 33 36 var req struct { 34 - Community string `json:"community"` 37 + Community string `json:"community"` // DID only (per lexicon) 35 38 ContentVisibility int `json:"contentVisibility"` // Optional: 1-5 scale, defaults to 3 36 39 } 37 40 ··· 42 45 43 46 if req.Community == "" { 44 47 writeError(w, http.StatusBadRequest, "InvalidRequest", "community is required") 48 + return 49 + } 50 + 51 + // Validate DID format (per lexicon: subject field requires format "did") 52 + if !strings.HasPrefix(req.Community, "did:") { 53 + writeError(w, http.StatusBadRequest, "InvalidRequest", 54 + "community must be a DID (did:plc:... or did:web:...)") 45 55 return 46 56 } 47 57 ··· 82 92 83 93 // HandleUnsubscribe unsubscribes a user from a community 84 94 // POST /xrpc/social.coves.community.unsubscribe 85 - // Body: { "community": "did:plc:xxx" or "!gaming@coves.social" } 95 + // 96 + // Request body: { "community": "did:plc:xxx" } 97 + // Note: Per lexicon spec, only DIDs are accepted (not handles). 86 98 func (h *SubscribeHandler) HandleUnsubscribe(w http.ResponseWriter, r *http.Request) { 87 99 if r.Method != http.MethodPost { 88 100 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) ··· 91 103 92 104 // Parse request body 93 105 var req struct { 94 - Community string `json:"community"` 106 + Community string `json:"community"` // DID only (per lexicon) 95 107 } 96 108 97 109 if err := json.NewDecoder(r.Body).Decode(&req); err != nil { ··· 101 113 102 114 if req.Community == "" { 103 115 writeError(w, http.StatusBadRequest, "InvalidRequest", "community is required") 116 + return 117 + } 118 + 119 + // Validate DID format (per lexicon: subject field requires format "did") 120 + if !strings.HasPrefix(req.Community, "did:") { 121 + writeError(w, http.StatusBadRequest, "InvalidRequest", 122 + "community must be a DID (did:plc:... or did:web:...)") 104 123 return 105 124 } 106 125
+7
internal/api/routes/community.go
··· 18 18 listHandler := community.NewListHandler(service) 19 19 searchHandler := community.NewSearchHandler(service) 20 20 subscribeHandler := community.NewSubscribeHandler(service) 21 + blockHandler := community.NewBlockHandler(service) 21 22 22 23 // Query endpoints (GET) - public access 23 24 // social.coves.community.get - get a single community by identifier ··· 41 42 42 43 // social.coves.community.unsubscribe - unsubscribe from a community 43 44 r.With(authMiddleware.RequireAuth).Post("/xrpc/social.coves.community.unsubscribe", subscribeHandler.HandleUnsubscribe) 45 + 46 + // social.coves.community.blockCommunity - block a community 47 + r.With(authMiddleware.RequireAuth).Post("/xrpc/social.coves.community.blockCommunity", blockHandler.HandleBlock) 48 + 49 + // social.coves.community.unblockCommunity - unblock a community 50 + r.With(authMiddleware.RequireAuth).Post("/xrpc/social.coves.community.unblockCommunity", blockHandler.HandleUnblock) 44 51 45 52 // TODO: Add delete handler when implemented 46 53 // r.With(authMiddleware.RequireAuth).Post("/xrpc/social.coves.community.delete", deleteHandler.HandleDelete)
+99 -1
internal/atproto/jetstream/community_consumer.go
··· 1 1 package jetstream 2 2 3 3 import ( 4 + "Coves/internal/atproto/utils" 4 5 "Coves/internal/core/communities" 5 6 "context" 6 7 "encoding/json" ··· 35 36 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 36 37 // - social.coves.community.profile: Community profile records (in community's own repo) 37 38 // - social.coves.community.subscription: Subscription records (in user's repo) 39 + // - social.coves.community.block: Block records (in user's repo) 38 40 // 39 41 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 40 42 // that CREATE or DELETE records in these collections ··· 44 46 case "social.coves.community.subscription": 45 47 // Handle both create (subscribe) and delete (unsubscribe) operations 46 48 return c.handleSubscription(ctx, event.Did, commit) 49 + case "social.coves.community.block": 50 + // Handle both create (block) and delete (unblock) operations 51 + return c.handleBlock(ctx, event.Did, commit) 47 52 default: 48 53 // Not a community-related collection 49 54 return nil ··· 267 272 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 268 273 269 274 // Create subscription entity 275 + // Parse createdAt from record to preserve chronological ordering during replays 270 276 subscription := &communities.Subscription{ 271 277 UserDID: userDID, 272 278 CommunityDID: communityDID, 273 279 ContentVisibility: contentVisibility, 274 - SubscribedAt: time.Now(), 280 + SubscribedAt: utils.ParseCreatedAt(commit.Record), 275 281 RecordURI: uri, 276 282 RecordCID: commit.CID, 277 283 } ··· 325 331 } 326 332 327 333 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 334 + return nil 335 + } 336 + 337 + // handleBlock processes block create/delete events 338 + // CREATE operation = user blocked a community 339 + // DELETE operation = user unblocked a community 340 + func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 341 + switch commit.Operation { 342 + case "create": 343 + return c.createBlock(ctx, userDID, commit) 344 + case "delete": 345 + return c.deleteBlock(ctx, userDID, commit) 346 + default: 347 + // Update operations shouldn't happen on blocks, but ignore gracefully 348 + log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 349 + commit.Operation, userDID, commit.RKey) 350 + return nil 351 + } 352 + } 353 + 354 + // createBlock indexes a new block 355 + func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 356 + if commit.Record == nil { 357 + return fmt.Errorf("block create event missing record data") 358 + } 359 + 360 + // Extract community DID from record's subject field (following atProto conventions) 361 + communityDID, ok := commit.Record["subject"].(string) 362 + if !ok { 363 + return fmt.Errorf("block record missing subject field") 364 + } 365 + 366 + // Build AT-URI for block record 367 + // The record lives in the USER's repository 368 + uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 369 + 370 + // Create block entity 371 + // Parse createdAt from record to preserve chronological ordering during replays 372 + block := &communities.CommunityBlock{ 373 + UserDID: userDID, 374 + CommunityDID: communityDID, 375 + BlockedAt: utils.ParseCreatedAt(commit.Record), 376 + RecordURI: uri, 377 + RecordCID: commit.CID, 378 + } 379 + 380 + // Index the block 381 + // This is idempotent - safe for Jetstream replays 382 + _, err := c.repo.BlockCommunity(ctx, block) 383 + if err != nil { 384 + // If already exists, that's fine (idempotency) 385 + if communities.IsConflict(err) { 386 + log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 387 + return nil 388 + } 389 + return fmt.Errorf("failed to index block: %w", err) 390 + } 391 + 392 + log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 393 + return nil 394 + } 395 + 396 + // deleteBlock removes a block from the index 397 + // DELETE operations don't include record data, so we need to look up the block 398 + // by its URI to find which community the user unblocked 399 + func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 400 + // Build AT-URI from the rkey 401 + uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 402 + 403 + // Look up the block to get the community DID 404 + // (DELETE operations don't include record data in Jetstream) 405 + block, err := c.repo.GetBlockByURI(ctx, uri) 406 + if err != nil { 407 + if communities.IsNotFound(err) { 408 + // Already deleted - this is fine (idempotency) 409 + log.Printf("Block already deleted: %s", uri) 410 + return nil 411 + } 412 + return fmt.Errorf("failed to find block for deletion: %w", err) 413 + } 414 + 415 + // Remove the block from the index 416 + err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 417 + if err != nil { 418 + if communities.IsNotFound(err) { 419 + log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 420 + return nil 421 + } 422 + return fmt.Errorf("failed to remove block: %w", err) 423 + } 424 + 425 + log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 328 426 return nil 329 427 } 330 428
+27
internal/atproto/lexicon/social/coves/community/block.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "social.coves.community.block", 4 + "defs": { 5 + "main": { 6 + "type": "record", 7 + "description": "Record declaring a block relationship against a community. Blocks are public.", 8 + "key": "tid", 9 + "record": { 10 + "type": "object", 11 + "required": ["subject", "createdAt"], 12 + "properties": { 13 + "subject": { 14 + "type": "string", 15 + "format": "did", 16 + "description": "DID of the community being blocked" 17 + }, 18 + "createdAt": { 19 + "type": "string", 20 + "format": "datetime", 21 + "description": "When the block was created" 22 + } 23 + } 24 + } 25 + } 26 + } 27 + }
+2 -2
internal/atproto/lexicon/social/coves/community/subscription.json
··· 12 12 "properties": { 13 13 "subject": { 14 14 "type": "string", 15 - "format": "at-identifier", 16 - "description": "DID or handle of the community being subscribed to" 15 + "format": "did", 16 + "description": "DID of the community being subscribed to" 17 17 }, 18 18 "createdAt": { 19 19 "type": "string",
+49
internal/atproto/utils/record_utils.go
··· 1 + package utils 2 + 3 + import ( 4 + "database/sql" 5 + "strings" 6 + "time" 7 + ) 8 + 9 + // ExtractRKeyFromURI extracts the record key from an AT-URI 10 + // Format: at://did/collection/rkey -> rkey 11 + func ExtractRKeyFromURI(uri string) string { 12 + parts := strings.Split(uri, "/") 13 + if len(parts) >= 4 { 14 + return parts[len(parts)-1] 15 + } 16 + return "" 17 + } 18 + 19 + // StringFromNull converts sql.NullString to string 20 + // Returns empty string if the NullString is not valid 21 + func StringFromNull(ns sql.NullString) string { 22 + if ns.Valid { 23 + return ns.String 24 + } 25 + return "" 26 + } 27 + 28 + // ParseCreatedAt extracts and parses the createdAt timestamp from an atProto record 29 + // Falls back to time.Now() if the field is missing or invalid 30 + // This preserves chronological ordering during Jetstream replays and backfills 31 + func ParseCreatedAt(record map[string]interface{}) time.Time { 32 + if record == nil { 33 + return time.Now() 34 + } 35 + 36 + createdAtStr, ok := record["createdAt"].(string) 37 + if !ok || createdAtStr == "" { 38 + return time.Now() 39 + } 40 + 41 + // atProto uses RFC3339 format for datetime fields 42 + createdAt, err := time.Parse(time.RFC3339, createdAtStr) 43 + if err != nil { 44 + // Fallback to now if parsing fails 45 + return time.Now() 46 + } 47 + 48 + return createdAt 49 + }
+11
internal/core/communities/community.go
··· 52 52 ID int `json:"id" db:"id"` 53 53 } 54 54 55 + // CommunityBlock represents a user blocking a community 56 + // Block records live in the user's repository (at://user_did/social.coves.community.block/{rkey}) 57 + type CommunityBlock struct { 58 + BlockedAt time.Time `json:"blockedAt" db:"blocked_at"` 59 + UserDID string `json:"userDid" db:"user_did"` 60 + CommunityDID string `json:"communityDid" db:"community_did"` 61 + RecordURI string `json:"recordUri,omitempty" db:"record_uri"` 62 + RecordCID string `json:"recordCid,omitempty" db:"record_cid"` 63 + ID int `json:"id" db:"id"` 64 + } 65 + 55 66 // Membership represents active participation with reputation tracking 56 67 type Membership struct { 57 68 JoinedAt time.Time `json:"joinedAt" db:"joined_at"`
+9 -1
internal/core/communities/errors.go
··· 31 31 // ErrSubscriptionNotFound is returned when subscription doesn't exist 32 32 ErrSubscriptionNotFound = errors.New("subscription not found") 33 33 34 + // ErrBlockNotFound is returned when block doesn't exist 35 + ErrBlockNotFound = errors.New("block not found") 36 + 37 + // ErrBlockAlreadyExists is returned when user has already blocked the community 38 + ErrBlockAlreadyExists = errors.New("community already blocked") 39 + 34 40 // ErrMembershipNotFound is returned when membership doesn't exist 35 41 ErrMembershipNotFound = errors.New("membership not found") 36 42 ··· 63 69 func IsNotFound(err error) bool { 64 70 return errors.Is(err, ErrCommunityNotFound) || 65 71 errors.Is(err, ErrSubscriptionNotFound) || 72 + errors.Is(err, ErrBlockNotFound) || 66 73 errors.Is(err, ErrMembershipNotFound) 67 74 } 68 75 ··· 70 77 func IsConflict(err error) bool { 71 78 return errors.Is(err, ErrCommunityAlreadyExists) || 72 79 errors.Is(err, ErrHandleTaken) || 73 - errors.Is(err, ErrSubscriptionAlreadyExists) 80 + errors.Is(err, ErrSubscriptionAlreadyExists) || 81 + errors.Is(err, ErrBlockAlreadyExists) 74 82 } 75 83 76 84 // IsValidationError checks if error is a validation error
+14
internal/core/communities/interfaces.go
··· 26 26 ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) 27 27 ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*Subscription, error) 28 28 29 + // Community Blocks 30 + BlockCommunity(ctx context.Context, block *CommunityBlock) (*CommunityBlock, error) 31 + UnblockCommunity(ctx context.Context, userDID, communityDID string) error 32 + GetBlock(ctx context.Context, userDID, communityDID string) (*CommunityBlock, error) 33 + GetBlockByURI(ctx context.Context, recordURI string) (*CommunityBlock, error) // For Jetstream delete operations 34 + ListBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) 35 + IsBlocked(ctx context.Context, userDID, communityDID string) (bool, error) 36 + 29 37 // Memberships (active participation with reputation) 30 38 CreateMembership(ctx context.Context, membership *Membership) (*Membership, error) 31 39 GetMembership(ctx context.Context, userDID, communityDID string) (*Membership, error) ··· 59 67 UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error 60 68 GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) 61 69 GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) 70 + 71 + // Block operations (write-forward: creates record in user's PDS) 72 + BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) 73 + UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error 74 + GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) 75 + IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) 62 76 63 77 // Membership operations (indexed from firehose, reputation managed internally) 64 78 GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error)
+140 -46
internal/core/communities/service.go
··· 1 1 package communities 2 2 3 3 import ( 4 + "Coves/internal/atproto/utils" 4 5 "bytes" 5 6 "context" 6 7 "encoding/json" 8 + "errors" 7 9 "fmt" 8 10 "io" 9 11 "log" ··· 455 457 } 456 458 457 459 // Extract rkey from record URI (at://did/collection/rkey) 458 - rkey := extractRKeyFromURI(subscription.RecordURI) 460 + rkey := utils.ExtractRKeyFromURI(subscription.RecordURI) 459 461 if rkey == "" { 460 462 return fmt.Errorf("invalid subscription record URI") 461 463 } ··· 516 518 return s.repo.ListMembers(ctx, communityDID, limit, offset) 517 519 } 518 520 521 + // BlockCommunity blocks a community via write-forward to PDS 522 + func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) { 523 + if userDID == "" { 524 + return nil, NewValidationError("userDid", "required") 525 + } 526 + if userAccessToken == "" { 527 + return nil, NewValidationError("userAccessToken", "required") 528 + } 529 + 530 + // Resolve community identifier (also verifies community exists) 531 + communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 532 + if err != nil { 533 + return nil, err 534 + } 535 + 536 + // Build block record 537 + // CRITICAL: Collection is social.coves.community.block (RECORD TYPE) 538 + // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid} 539 + // Following atProto conventions and Bluesky's app.bsky.graph.block pattern 540 + blockRecord := map[string]interface{}{ 541 + "$type": "social.coves.community.block", 542 + "subject": communityDID, // DID of community being blocked 543 + "createdAt": time.Now().Format(time.RFC3339), 544 + } 545 + 546 + // Write-forward: create block record in user's repo using their access token 547 + // Note: We don't check for existing blocks first because: 548 + // 1. The PDS may reject duplicates (depending on implementation) 549 + // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING 550 + // 3. This avoids a race condition where two concurrent requests both pass the check 551 + recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken) 552 + if err != nil { 553 + // Check if this is a duplicate/conflict error from PDS 554 + // PDS should return 409 Conflict for duplicate records, but we also check common error messages 555 + // for compatibility with different PDS implementations 556 + errMsg := err.Error() 557 + isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict 558 + strings.Contains(errMsg, "duplicate") || 559 + strings.Contains(errMsg, "already exists") || 560 + strings.Contains(errMsg, "AlreadyExists") 561 + 562 + if isDuplicate { 563 + // Fetch and return existing block from our indexed view 564 + existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID) 565 + if getErr == nil { 566 + // Block exists in our index - return it 567 + return existingBlock, nil 568 + } 569 + // Only treat as "already exists" if the error is ErrBlockNotFound (race condition) 570 + // Any other error (DB outage, connection failure, etc.) should bubble up 571 + if errors.Is(getErr, ErrBlockNotFound) { 572 + // Race condition: PDS has the block but Jetstream hasn't indexed it yet 573 + // Return typed conflict error so handler can return 409 instead of 500 574 + // This is normal in eventually-consistent systems 575 + return nil, ErrBlockAlreadyExists 576 + } 577 + // Real datastore error - bubble it up so operators see the failure 578 + return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr) 579 + } 580 + return nil, fmt.Errorf("failed to create block on PDS: %w", err) 581 + } 582 + 583 + // Return block representation 584 + block := &CommunityBlock{ 585 + UserDID: userDID, 586 + CommunityDID: communityDID, 587 + BlockedAt: time.Now(), 588 + RecordURI: recordURI, 589 + RecordCID: recordCID, 590 + } 591 + 592 + return block, nil 593 + } 594 + 595 + // UnblockCommunity removes a block via PDS delete 596 + func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 597 + if userDID == "" { 598 + return NewValidationError("userDid", "required") 599 + } 600 + if userAccessToken == "" { 601 + return NewValidationError("userAccessToken", "required") 602 + } 603 + 604 + // Resolve community identifier 605 + communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 606 + if err != nil { 607 + return err 608 + } 609 + 610 + // Get the block from AppView to find the record key 611 + block, err := s.repo.GetBlock(ctx, userDID, communityDID) 612 + if err != nil { 613 + return err 614 + } 615 + 616 + // Extract rkey from record URI (at://did/collection/rkey) 617 + rkey := utils.ExtractRKeyFromURI(block.RecordURI) 618 + if rkey == "" { 619 + return fmt.Errorf("invalid block record URI") 620 + } 621 + 622 + // Write-forward: delete record from PDS using user's access token 623 + if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil { 624 + return fmt.Errorf("failed to delete block on PDS: %w", err) 625 + } 626 + 627 + return nil 628 + } 629 + 630 + // GetBlockedCommunities queries AppView DB for user's blocks 631 + func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) { 632 + if limit <= 0 || limit > 100 { 633 + limit = 50 634 + } 635 + 636 + return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset) 637 + } 638 + 639 + // IsBlocked checks if a user has blocked a community 640 + func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) { 641 + communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 642 + if err != nil { 643 + return false, err 644 + } 645 + 646 + return s.repo.IsBlocked(ctx, userDID, communityDID) 647 + } 648 + 519 649 // ValidateHandle checks if a community handle is valid 520 650 func (s *communityService) ValidateHandle(handle string) error { 521 651 if handle == "" { ··· 535 665 return "", ErrInvalidInput 536 666 } 537 667 538 - // If it's already a DID, return it 668 + // If it's already a DID, verify the community exists 539 669 if strings.HasPrefix(identifier, "did:") { 670 + _, err := s.repo.GetByDID(ctx, identifier) 671 + if err != nil { 672 + if IsNotFound(err) { 673 + return "", fmt.Errorf("community not found: %w", err) 674 + } 675 + return "", fmt.Errorf("failed to verify community DID: %w", err) 676 + } 540 677 return identifier, nil 541 678 } 542 679 ··· 594 731 595 732 // PDS write-forward helpers 596 733 597 - func (s *communityService) createRecordOnPDS(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}) (string, string, error) { 598 - endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 599 - 600 - payload := map[string]interface{}{ 601 - "repo": repoDID, 602 - "collection": collection, 603 - "record": record, 604 - } 605 - 606 - if rkey != "" { 607 - payload["rkey"] = rkey 608 - } 609 - 610 - return s.callPDS(ctx, "POST", endpoint, payload) 611 - } 612 - 613 734 // createRecordOnPDSAs creates a record with a specific access token (for V2 community auth) 614 735 func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 615 736 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) ··· 641 762 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 642 763 } 643 764 644 - func (s *communityService) deleteRecordOnPDS(ctx context.Context, repoDID, collection, rkey string) error { 645 - endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 646 - 647 - payload := map[string]interface{}{ 648 - "repo": repoDID, 649 - "collection": collection, 650 - "rkey": rkey, 651 - } 652 - 653 - _, _, err := s.callPDS(ctx, "POST", endpoint, payload) 654 - return err 655 - } 656 - 657 765 // deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions) 658 - func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, accessToken string) error { 766 + func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error { 659 767 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 660 768 661 769 payload := map[string]interface{}{ ··· 666 774 667 775 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 668 776 return err 669 - } 670 - 671 - func (s *communityService) callPDS(ctx context.Context, method, endpoint string, payload map[string]interface{}) (string, string, error) { 672 - // Use instance's access token 673 - return s.callPDSWithAuth(ctx, method, endpoint, payload, s.pdsAccessToken) 674 777 } 675 778 676 779 // callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication) ··· 740 843 } 741 844 742 845 // Helper functions 743 - 744 - func extractRKeyFromURI(uri string) string { 745 - // at://did/collection/rkey -> rkey 746 - parts := strings.Split(uri, "/") 747 - if len(parts) >= 4 { 748 - return parts[len(parts)-1] 749 - } 750 - return "" 751 - }
+28
internal/db/migrations/009_create_community_blocks_table.sql
··· 1 + -- +goose Up 2 + CREATE TABLE community_blocks ( 3 + id SERIAL PRIMARY KEY, 4 + user_did TEXT NOT NULL CHECK (user_did ~ '^did:(plc|web):[a-zA-Z0-9._:%-]+$'), 5 + community_did TEXT NOT NULL CHECK (community_did ~ '^did:(plc|web):[a-zA-Z0-9._:%-]+$'), 6 + blocked_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, 7 + 8 + -- AT-Proto metadata (block record lives in user's repo) 9 + -- These are required for atProto record verification and federation 10 + record_uri TEXT NOT NULL, -- atProto record identifier (at://user_did/social.coves.community.block/rkey) 11 + record_cid TEXT NOT NULL, -- Content address (critical for verification) 12 + 13 + UNIQUE(user_did, community_did) 14 + ); 15 + 16 + -- Indexes for efficient queries 17 + -- Note: UNIQUE constraint on (user_did, community_did) already creates an index for those columns 18 + CREATE INDEX idx_blocks_user ON community_blocks(user_did); 19 + CREATE INDEX idx_blocks_community ON community_blocks(community_did); 20 + CREATE INDEX idx_blocks_record_uri ON community_blocks(record_uri); -- For GetBlockByURI (Jetstream DELETE operations) 21 + CREATE INDEX idx_blocks_blocked_at ON community_blocks(blocked_at); 22 + 23 + -- +goose Down 24 + DROP INDEX IF EXISTS idx_blocks_blocked_at; 25 + DROP INDEX IF EXISTS idx_blocks_record_uri; 26 + DROP INDEX IF EXISTS idx_blocks_community; 27 + DROP INDEX IF EXISTS idx_blocks_user; 28 + DROP TABLE IF EXISTS community_blocks;
+173
internal/db/postgres/community_repo_blocks.go
··· 1 + package postgres 2 + 3 + import ( 4 + "Coves/internal/core/communities" 5 + "context" 6 + "database/sql" 7 + "fmt" 8 + "log" 9 + ) 10 + 11 + // BlockCommunity creates a new block record (idempotent) 12 + func (r *postgresCommunityRepo) BlockCommunity(ctx context.Context, block *communities.CommunityBlock) (*communities.CommunityBlock, error) { 13 + query := ` 14 + INSERT INTO community_blocks (user_did, community_did, blocked_at, record_uri, record_cid) 15 + VALUES ($1, $2, $3, $4, $5) 16 + ON CONFLICT (user_did, community_did) DO UPDATE SET 17 + record_uri = EXCLUDED.record_uri, 18 + record_cid = EXCLUDED.record_cid, 19 + blocked_at = EXCLUDED.blocked_at 20 + RETURNING id, blocked_at` 21 + 22 + err := r.db.QueryRowContext(ctx, query, 23 + block.UserDID, 24 + block.CommunityDID, 25 + block.BlockedAt, 26 + block.RecordURI, 27 + block.RecordCID, 28 + ).Scan(&block.ID, &block.BlockedAt) 29 + if err != nil { 30 + return nil, fmt.Errorf("failed to create block: %w", err) 31 + } 32 + 33 + return block, nil 34 + } 35 + 36 + // UnblockCommunity removes a block record 37 + func (r *postgresCommunityRepo) UnblockCommunity(ctx context.Context, userDID, communityDID string) error { 38 + query := `DELETE FROM community_blocks WHERE user_did = $1 AND community_did = $2` 39 + 40 + result, err := r.db.ExecContext(ctx, query, userDID, communityDID) 41 + if err != nil { 42 + return fmt.Errorf("failed to unblock community: %w", err) 43 + } 44 + 45 + rowsAffected, err := result.RowsAffected() 46 + if err != nil { 47 + return fmt.Errorf("failed to check unblock result: %w", err) 48 + } 49 + 50 + if rowsAffected == 0 { 51 + return communities.ErrBlockNotFound 52 + } 53 + 54 + return nil 55 + } 56 + 57 + // GetBlock retrieves a block record by user DID and community DID 58 + func (r *postgresCommunityRepo) GetBlock(ctx context.Context, userDID, communityDID string) (*communities.CommunityBlock, error) { 59 + query := ` 60 + SELECT id, user_did, community_did, blocked_at, record_uri, record_cid 61 + FROM community_blocks 62 + WHERE user_did = $1 AND community_did = $2` 63 + 64 + var block communities.CommunityBlock 65 + 66 + err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan( 67 + &block.ID, 68 + &block.UserDID, 69 + &block.CommunityDID, 70 + &block.BlockedAt, 71 + &block.RecordURI, 72 + &block.RecordCID, 73 + ) 74 + if err != nil { 75 + if err == sql.ErrNoRows { 76 + return nil, communities.ErrBlockNotFound 77 + } 78 + return nil, fmt.Errorf("failed to get block: %w", err) 79 + } 80 + 81 + return &block, nil 82 + } 83 + 84 + // GetBlockByURI retrieves a block record by its AT-URI (for Jetstream DELETE operations) 85 + func (r *postgresCommunityRepo) GetBlockByURI(ctx context.Context, recordURI string) (*communities.CommunityBlock, error) { 86 + query := ` 87 + SELECT id, user_did, community_did, blocked_at, record_uri, record_cid 88 + FROM community_blocks 89 + WHERE record_uri = $1` 90 + 91 + var block communities.CommunityBlock 92 + 93 + err := r.db.QueryRowContext(ctx, query, recordURI).Scan( 94 + &block.ID, 95 + &block.UserDID, 96 + &block.CommunityDID, 97 + &block.BlockedAt, 98 + &block.RecordURI, 99 + &block.RecordCID, 100 + ) 101 + if err != nil { 102 + if err == sql.ErrNoRows { 103 + return nil, communities.ErrBlockNotFound 104 + } 105 + return nil, fmt.Errorf("failed to get block by URI: %w", err) 106 + } 107 + 108 + return &block, nil 109 + } 110 + 111 + // ListBlockedCommunities retrieves all communities blocked by a user 112 + func (r *postgresCommunityRepo) ListBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*communities.CommunityBlock, error) { 113 + query := ` 114 + SELECT id, user_did, community_did, blocked_at, record_uri, record_cid 115 + FROM community_blocks 116 + WHERE user_did = $1 117 + ORDER BY blocked_at DESC 118 + LIMIT $2 OFFSET $3` 119 + 120 + rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset) 121 + if err != nil { 122 + return nil, fmt.Errorf("failed to list blocked communities: %w", err) 123 + } 124 + defer func() { 125 + if closeErr := rows.Close(); closeErr != nil { 126 + // Log error but don't override the main error 127 + log.Printf("Failed to close rows: %v", closeErr) 128 + } 129 + }() 130 + 131 + var blocks []*communities.CommunityBlock 132 + for rows.Next() { 133 + // Allocate a new block for each iteration to avoid pointer reuse bug 134 + block := &communities.CommunityBlock{} 135 + 136 + err = rows.Scan( 137 + &block.ID, 138 + &block.UserDID, 139 + &block.CommunityDID, 140 + &block.BlockedAt, 141 + &block.RecordURI, 142 + &block.RecordCID, 143 + ) 144 + if err != nil { 145 + return nil, fmt.Errorf("failed to scan block: %w", err) 146 + } 147 + 148 + blocks = append(blocks, block) 149 + } 150 + 151 + if err = rows.Err(); err != nil { 152 + return nil, fmt.Errorf("error iterating blocks: %w", err) 153 + } 154 + 155 + return blocks, nil 156 + } 157 + 158 + // IsBlocked checks if a user has blocked a specific community (fast EXISTS check) 159 + func (r *postgresCommunityRepo) IsBlocked(ctx context.Context, userDID, communityDID string) (bool, error) { 160 + query := ` 161 + SELECT EXISTS( 162 + SELECT 1 FROM community_blocks 163 + WHERE user_did = $1 AND community_did = $2 164 + )` 165 + 166 + var exists bool 167 + err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan(&exists) 168 + if err != nil { 169 + return false, fmt.Errorf("failed to check if blocked: %w", err) 170 + } 171 + 172 + return exists, nil 173 + }
+470
tests/integration/community_blocking_test.go
··· 1 + package integration 2 + 3 + import ( 4 + "Coves/internal/atproto/jetstream" 5 + "Coves/internal/core/communities" 6 + postgresRepo "Coves/internal/db/postgres" 7 + "context" 8 + "database/sql" 9 + "fmt" 10 + "testing" 11 + "time" 12 + ) 13 + 14 + // TestCommunityBlocking_Indexing tests Jetstream indexing of block events 15 + func TestCommunityBlocking_Indexing(t *testing.T) { 16 + if testing.Short() { 17 + t.Skip("Skipping integration test in short mode") 18 + } 19 + 20 + ctx := context.Background() 21 + db := setupTestDB(t) 22 + defer cleanupBlockingTestDB(t, db) 23 + 24 + repo := createBlockingTestCommunityRepo(t, db) 25 + consumer := jetstream.NewCommunityEventConsumer(repo) 26 + 27 + // Create test community 28 + testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 29 + community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID) 30 + 31 + t.Run("indexes block CREATE event", func(t *testing.T) { 32 + userDID := "did:plc:test-user-blocker" 33 + rkey := "test-block-1" 34 + 35 + // Simulate Jetstream CREATE event 36 + event := &jetstream.JetstreamEvent{ 37 + Did: userDID, 38 + Kind: "commit", 39 + TimeUS: time.Now().UnixMicro(), 40 + Commit: &jetstream.CommitEvent{ 41 + Rev: "test-rev-1", 42 + Operation: "create", 43 + Collection: "social.coves.community.block", 44 + RKey: rkey, 45 + CID: "bafyblock123", 46 + Record: map[string]interface{}{ 47 + "$type": "social.coves.community.block", 48 + "subject": community.DID, 49 + "createdAt": time.Now().Format(time.RFC3339), 50 + }, 51 + }, 52 + } 53 + 54 + // Process event 55 + err := consumer.HandleEvent(ctx, event) 56 + if err != nil { 57 + t.Fatalf("Failed to handle block event: %v", err) 58 + } 59 + 60 + // Verify block indexed 61 + block, err := repo.GetBlock(ctx, userDID, community.DID) 62 + if err != nil { 63 + t.Fatalf("Failed to get block: %v", err) 64 + } 65 + 66 + if block.UserDID != userDID { 67 + t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 68 + } 69 + if block.CommunityDID != community.DID { 70 + t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 71 + } 72 + 73 + // Verify IsBlocked works 74 + isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 75 + if err != nil { 76 + t.Fatalf("IsBlocked failed: %v", err) 77 + } 78 + if !isBlocked { 79 + t.Error("Expected IsBlocked=true, got false") 80 + } 81 + }) 82 + 83 + t.Run("indexes block DELETE event", func(t *testing.T) { 84 + userDID := "did:plc:test-user-unblocker" 85 + rkey := "test-block-2" 86 + uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey) 87 + 88 + // First create a block 89 + block := &communities.CommunityBlock{ 90 + UserDID: userDID, 91 + CommunityDID: community.DID, 92 + BlockedAt: time.Now(), 93 + RecordURI: uri, 94 + RecordCID: "bafyblock456", 95 + } 96 + _, err := repo.BlockCommunity(ctx, block) 97 + if err != nil { 98 + t.Fatalf("Failed to create block: %v", err) 99 + } 100 + 101 + // Simulate DELETE event 102 + event := &jetstream.JetstreamEvent{ 103 + Did: userDID, 104 + Kind: "commit", 105 + TimeUS: time.Now().UnixMicro(), 106 + Commit: &jetstream.CommitEvent{ 107 + Rev: "test-rev-2", 108 + Operation: "delete", 109 + Collection: "social.coves.community.block", 110 + RKey: rkey, 111 + }, 112 + } 113 + 114 + // Process delete 115 + err = consumer.HandleEvent(ctx, event) 116 + if err != nil { 117 + t.Fatalf("Failed to handle delete event: %v", err) 118 + } 119 + 120 + // Verify block removed 121 + _, err = repo.GetBlock(ctx, userDID, community.DID) 122 + if !communities.IsNotFound(err) { 123 + t.Error("Expected block to be deleted") 124 + } 125 + 126 + // Verify IsBlocked returns false 127 + isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 128 + if err != nil { 129 + t.Fatalf("IsBlocked failed: %v", err) 130 + } 131 + if isBlocked { 132 + t.Error("Expected IsBlocked=false, got true") 133 + } 134 + }) 135 + 136 + t.Run("block is idempotent", func(t *testing.T) { 137 + userDID := "did:plc:test-user-idempotent" 138 + rkey := "test-block-3" 139 + 140 + event := &jetstream.JetstreamEvent{ 141 + Did: userDID, 142 + Kind: "commit", 143 + TimeUS: time.Now().UnixMicro(), 144 + Commit: &jetstream.CommitEvent{ 145 + Rev: "test-rev-3", 146 + Operation: "create", 147 + Collection: "social.coves.community.block", 148 + RKey: rkey, 149 + CID: "bafyblock789", 150 + Record: map[string]interface{}{ 151 + "$type": "social.coves.community.block", 152 + "subject": community.DID, 153 + "createdAt": time.Now().Format(time.RFC3339), 154 + }, 155 + }, 156 + } 157 + 158 + // Process event twice 159 + err := consumer.HandleEvent(ctx, event) 160 + if err != nil { 161 + t.Fatalf("First block failed: %v", err) 162 + } 163 + 164 + err = consumer.HandleEvent(ctx, event) 165 + if err != nil { 166 + t.Fatalf("Second block (idempotent) failed: %v", err) 167 + } 168 + 169 + // Should still exist only once 170 + blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 171 + if err != nil { 172 + t.Fatalf("ListBlockedCommunities failed: %v", err) 173 + } 174 + if len(blocks) != 1 { 175 + t.Errorf("Expected 1 block, got %d", len(blocks)) 176 + } 177 + }) 178 + 179 + t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) { 180 + userDID := "did:plc:test-user-nonexistent" 181 + rkey := "test-block-nonexistent" 182 + 183 + // Simulate DELETE event for block that doesn't exist 184 + event := &jetstream.JetstreamEvent{ 185 + Did: userDID, 186 + Kind: "commit", 187 + TimeUS: time.Now().UnixMicro(), 188 + Commit: &jetstream.CommitEvent{ 189 + Rev: "test-rev-99", 190 + Operation: "delete", 191 + Collection: "social.coves.community.block", 192 + RKey: rkey, 193 + }, 194 + } 195 + 196 + // Should not error (idempotent) 197 + err := consumer.HandleEvent(ctx, event) 198 + if err != nil { 199 + t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err) 200 + } 201 + }) 202 + } 203 + 204 + // TestCommunityBlocking_ListBlocked tests listing blocked communities 205 + func TestCommunityBlocking_ListBlocked(t *testing.T) { 206 + if testing.Short() { 207 + t.Skip("Skipping integration test in short mode") 208 + } 209 + 210 + ctx := context.Background() 211 + db := setupTestDB(t) 212 + defer cleanupBlockingTestDB(t, db) 213 + 214 + repo := createBlockingTestCommunityRepo(t, db) 215 + userDID := "did:plc:test-user-list" 216 + 217 + // Create and block 3 communities 218 + testCommunities := make([]*communities.Community, 3) 219 + for i := 0; i < 3; i++ { 220 + communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i) 221 + testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID) 222 + 223 + block := &communities.CommunityBlock{ 224 + UserDID: userDID, 225 + CommunityDID: testCommunities[i].DID, 226 + BlockedAt: time.Now(), 227 + RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i), 228 + RecordCID: fmt.Sprintf("bafyblock%d", i), 229 + } 230 + _, err := repo.BlockCommunity(ctx, block) 231 + if err != nil { 232 + t.Fatalf("Failed to block community %d: %v", i, err) 233 + } 234 + } 235 + 236 + t.Run("lists all blocked communities", func(t *testing.T) { 237 + blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 238 + if err != nil { 239 + t.Fatalf("ListBlockedCommunities failed: %v", err) 240 + } 241 + 242 + if len(blocks) != 3 { 243 + t.Errorf("Expected 3 blocks, got %d", len(blocks)) 244 + } 245 + 246 + // Verify all blocks belong to correct user 247 + for _, block := range blocks { 248 + if block.UserDID != userDID { 249 + t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 250 + } 251 + } 252 + }) 253 + 254 + t.Run("pagination works correctly", func(t *testing.T) { 255 + // Get first 2 256 + blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0) 257 + if err != nil { 258 + t.Fatalf("ListBlockedCommunities with limit failed: %v", err) 259 + } 260 + if len(blocks) != 2 { 261 + t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks)) 262 + } 263 + 264 + // Get next 2 (should only get 1) 265 + blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2) 266 + if err != nil { 267 + t.Fatalf("ListBlockedCommunities page 2 failed: %v", err) 268 + } 269 + if len(blocksPage2) != 1 { 270 + t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2)) 271 + } 272 + }) 273 + 274 + t.Run("returns empty list for user with no blocks", func(t *testing.T) { 275 + blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0) 276 + if err != nil { 277 + t.Fatalf("ListBlockedCommunities failed: %v", err) 278 + } 279 + if len(blocks) != 0 { 280 + t.Errorf("Expected 0 blocks, got %d", len(blocks)) 281 + } 282 + }) 283 + } 284 + 285 + // TestCommunityBlocking_IsBlocked tests the fast block check 286 + func TestCommunityBlocking_IsBlocked(t *testing.T) { 287 + if testing.Short() { 288 + t.Skip("Skipping integration test in short mode") 289 + } 290 + 291 + ctx := context.Background() 292 + db := setupTestDB(t) 293 + defer cleanupBlockingTestDB(t, db) 294 + 295 + repo := createBlockingTestCommunityRepo(t, db) 296 + 297 + userDID := "did:plc:test-user-isblocked" 298 + communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 299 + community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID) 300 + 301 + t.Run("returns false when not blocked", func(t *testing.T) { 302 + isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 303 + if err != nil { 304 + t.Fatalf("IsBlocked failed: %v", err) 305 + } 306 + if isBlocked { 307 + t.Error("Expected IsBlocked=false, got true") 308 + } 309 + }) 310 + 311 + t.Run("returns true when blocked", func(t *testing.T) { 312 + // Create block 313 + block := &communities.CommunityBlock{ 314 + UserDID: userDID, 315 + CommunityDID: community.DID, 316 + BlockedAt: time.Now(), 317 + RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID), 318 + RecordCID: "bafyblocktest", 319 + } 320 + _, err := repo.BlockCommunity(ctx, block) 321 + if err != nil { 322 + t.Fatalf("Failed to create block: %v", err) 323 + } 324 + 325 + // Check IsBlocked 326 + isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 327 + if err != nil { 328 + t.Fatalf("IsBlocked failed: %v", err) 329 + } 330 + if !isBlocked { 331 + t.Error("Expected IsBlocked=true, got false") 332 + } 333 + }) 334 + 335 + t.Run("returns false after unblock", func(t *testing.T) { 336 + // Unblock 337 + err := repo.UnblockCommunity(ctx, userDID, community.DID) 338 + if err != nil { 339 + t.Fatalf("UnblockCommunity failed: %v", err) 340 + } 341 + 342 + // Check IsBlocked 343 + isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 344 + if err != nil { 345 + t.Fatalf("IsBlocked failed: %v", err) 346 + } 347 + if isBlocked { 348 + t.Error("Expected IsBlocked=false after unblock, got true") 349 + } 350 + }) 351 + } 352 + 353 + // TestCommunityBlocking_GetBlock tests block retrieval 354 + func TestCommunityBlocking_GetBlock(t *testing.T) { 355 + if testing.Short() { 356 + t.Skip("Skipping integration test in short mode") 357 + } 358 + 359 + ctx := context.Background() 360 + db := setupTestDB(t) 361 + defer cleanupBlockingTestDB(t, db) 362 + 363 + repo := createBlockingTestCommunityRepo(t, db) 364 + 365 + userDID := "did:plc:test-user-getblock" 366 + communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 367 + community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID) 368 + 369 + t.Run("returns error when block doesn't exist", func(t *testing.T) { 370 + _, err := repo.GetBlock(ctx, userDID, community.DID) 371 + if !communities.IsNotFound(err) { 372 + t.Errorf("Expected ErrBlockNotFound, got: %v", err) 373 + } 374 + }) 375 + 376 + t.Run("retrieves block by user and community DID", func(t *testing.T) { 377 + // Create block 378 + recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 379 + originalBlock := &communities.CommunityBlock{ 380 + UserDID: userDID, 381 + CommunityDID: community.DID, 382 + BlockedAt: time.Now(), 383 + RecordURI: recordURI, 384 + RecordCID: "bafyblockgettest", 385 + } 386 + _, err := repo.BlockCommunity(ctx, originalBlock) 387 + if err != nil { 388 + t.Fatalf("Failed to create block: %v", err) 389 + } 390 + 391 + // Retrieve by user+community 392 + block, err := repo.GetBlock(ctx, userDID, community.DID) 393 + if err != nil { 394 + t.Fatalf("GetBlock failed: %v", err) 395 + } 396 + 397 + if block.UserDID != userDID { 398 + t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 399 + } 400 + if block.CommunityDID != community.DID { 401 + t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 402 + } 403 + if block.RecordURI != recordURI { 404 + t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 405 + } 406 + }) 407 + 408 + t.Run("retrieves block by URI", func(t *testing.T) { 409 + recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 410 + 411 + // Retrieve by URI 412 + block, err := repo.GetBlockByURI(ctx, recordURI) 413 + if err != nil { 414 + t.Fatalf("GetBlockByURI failed: %v", err) 415 + } 416 + 417 + if block.RecordURI != recordURI { 418 + t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 419 + } 420 + if block.CommunityDID != community.DID { 421 + t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 422 + } 423 + }) 424 + } 425 + 426 + // Helper functions for blocking tests 427 + 428 + func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository { 429 + return postgresRepo.NewCommunityRepository(db) 430 + } 431 + 432 + func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 433 + community := &communities.Community{ 434 + DID: did, 435 + Handle: fmt.Sprintf("!%s@coves.test", name), 436 + Name: name, 437 + DisplayName: fmt.Sprintf("Test Community %s", name), 438 + Description: "Test community for blocking tests", 439 + OwnerDID: did, 440 + CreatedByDID: "did:plc:test-creator", 441 + HostedByDID: "did:plc:test-instance", 442 + Visibility: "public", 443 + CreatedAt: time.Now(), 444 + UpdatedAt: time.Now(), 445 + } 446 + 447 + created, err := repo.Create(context.Background(), community) 448 + if err != nil { 449 + t.Fatalf("Failed to create test community: %v", err) 450 + } 451 + 452 + return created 453 + } 454 + 455 + func cleanupBlockingTestDB(t *testing.T, db *sql.DB) { 456 + // Clean up test data 457 + _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'") 458 + if err != nil { 459 + t.Logf("Warning: Failed to clean up blocks: %v", err) 460 + } 461 + 462 + _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'") 463 + if err != nil { 464 + t.Logf("Warning: Failed to clean up communities: %v", err) 465 + } 466 + 467 + if closeErr := db.Close(); closeErr != nil { 468 + t.Logf("Failed to close database: %v", closeErr) 469 + } 470 + }
+333 -19
tests/integration/community_e2e_test.go
··· 5 5 "Coves/internal/api/routes" 6 6 "Coves/internal/atproto/identity" 7 7 "Coves/internal/atproto/jetstream" 8 + "Coves/internal/atproto/utils" 8 9 "Coves/internal/core/communities" 9 10 "Coves/internal/core/users" 10 11 "Coves/internal/db/postgres" ··· 213 214 t.Logf("\n📡 V2: Querying PDS for record in community's repository...") 214 215 215 216 collection := "social.coves.community.profile" 216 - rkey := extractRKeyFromURI(community.RecordURI) 217 + rkey := utils.ExtractRKeyFromURI(community.RecordURI) 217 218 218 219 // V2: Query community's repository (not instance repository!) 219 220 getRecordURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", ··· 423 424 // NOTE: Using synthetic event for speed. Real Jetstream WebSocket testing 424 425 // happens in "Part 2: Real Jetstream Firehose Consumption" above. 425 426 t.Logf("🔄 Simulating Jetstream consumer indexing...") 426 - rkey := extractRKeyFromURI(createResp.URI) 427 + rkey := utils.ExtractRKeyFromURI(createResp.URI) 427 428 // V2: Event comes from community's DID (community owns the repo) 428 429 event := jetstream.JetstreamEvent{ 429 430 Did: createResp.DID, ··· 626 627 pdsURL = "http://localhost:3001" 627 628 } 628 629 629 - rkey := extractRKeyFromURI(subscribeResp.URI) 630 + rkey := utils.ExtractRKeyFromURI(subscribeResp.URI) 630 631 // CRITICAL: Use correct collection name (record type, not XRPC endpoint) 631 632 collection := "social.coves.community.subscription" 632 633 ··· 686 687 CID: subscribeResp.CID, 687 688 Record: map[string]interface{}{ 688 689 "$type": "social.coves.community.subscription", 689 - "subject": community.DID, 690 + "subject": community.DID, 690 691 "contentVisibility": float64(5), // JSON numbers are float64 691 692 "createdAt": time.Now().Format(time.RFC3339), 692 693 }, ··· 757 758 } 758 759 759 760 // Index the subscription in AppView (simulate firehose event) 760 - rkey := extractRKeyFromURI(subscription.RecordURI) 761 + rkey := utils.ExtractRKeyFromURI(subscription.RecordURI) 761 762 subEvent := jetstream.JetstreamEvent{ 762 763 Did: instanceDID, 763 764 TimeUS: time.Now().UnixMicro(), ··· 770 771 CID: subscription.RecordCID, 771 772 Record: map[string]interface{}{ 772 773 "$type": "social.coves.community.subscription", 773 - "subject": community.DID, 774 + "subject": community.DID, 774 775 "contentVisibility": float64(3), 775 776 "createdAt": time.Now().Format(time.RFC3339), 776 777 }, ··· 892 893 Operation: "delete", 893 894 Collection: "social.coves.community.subscription", 894 895 RKey: rkey, 895 - CID: "", // No CID on deletes 896 - Record: nil, // No record data on deletes 896 + CID: "", // No CID on deletes 897 + Record: nil, // No record data on deletes 897 898 }, 898 899 } 899 900 if handleErr := consumer.HandleEvent(context.Background(), &deleteEvent); handleErr != nil { ··· 933 934 t.Logf(" ✓ Subscriber count decremented") 934 935 }) 935 936 937 + t.Run("Block via XRPC endpoint", func(t *testing.T) { 938 + // Create a community to block 939 + community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 940 + 941 + t.Logf("🚫 Blocking community via XRPC endpoint...") 942 + blockReq := map[string]interface{}{ 943 + "community": community.DID, 944 + } 945 + 946 + blockJSON, err := json.Marshal(blockReq) 947 + if err != nil { 948 + t.Fatalf("Failed to marshal block request: %v", err) 949 + } 950 + 951 + req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 952 + if err != nil { 953 + t.Fatalf("Failed to create block request: %v", err) 954 + } 955 + req.Header.Set("Content-Type", "application/json") 956 + req.Header.Set("Authorization", "Bearer "+accessToken) 957 + 958 + resp, err := http.DefaultClient.Do(req) 959 + if err != nil { 960 + t.Fatalf("Failed to POST block: %v", err) 961 + } 962 + defer func() { _ = resp.Body.Close() }() 963 + 964 + if resp.StatusCode != http.StatusOK { 965 + body, readErr := io.ReadAll(resp.Body) 966 + if readErr != nil { 967 + t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 968 + } 969 + t.Logf("❌ XRPC Block Failed") 970 + t.Logf(" Status: %d", resp.StatusCode) 971 + t.Logf(" Response: %s", string(body)) 972 + t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 973 + } 974 + 975 + var blockResp struct { 976 + Block struct { 977 + RecordURI string `json:"recordUri"` 978 + RecordCID string `json:"recordCid"` 979 + } `json:"block"` 980 + } 981 + 982 + if err := json.NewDecoder(resp.Body).Decode(&blockResp); err != nil { 983 + t.Fatalf("Failed to decode block response: %v", err) 984 + } 985 + 986 + t.Logf("✅ XRPC block response received:") 987 + t.Logf(" RecordURI: %s", blockResp.Block.RecordURI) 988 + t.Logf(" RecordCID: %s", blockResp.Block.RecordCID) 989 + 990 + // Extract rkey from URI for verification 991 + rkey := "" 992 + if uriParts := strings.Split(blockResp.Block.RecordURI, "/"); len(uriParts) >= 4 { 993 + rkey = uriParts[len(uriParts)-1] 994 + } 995 + 996 + // Verify the block record exists on PDS 997 + t.Logf("🔍 Verifying block record exists on PDS...") 998 + collection := "social.coves.community.block" 999 + pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 1000 + pdsURL, instanceDID, collection, rkey)) 1001 + if pdsErr != nil { 1002 + t.Fatalf("Failed to query PDS: %v", pdsErr) 1003 + } 1004 + defer func() { 1005 + if closeErr := pdsResp.Body.Close(); closeErr != nil { 1006 + t.Logf("Failed to close PDS response: %v", closeErr) 1007 + } 1008 + }() 1009 + 1010 + if pdsResp.StatusCode != http.StatusOK { 1011 + body, readErr := io.ReadAll(pdsResp.Body) 1012 + if readErr != nil { 1013 + t.Fatalf("Block record not found on PDS (status: %d, failed to read body: %v)", pdsResp.StatusCode, readErr) 1014 + } 1015 + t.Fatalf("Block record not found on PDS (status: %d): %s", pdsResp.StatusCode, string(body)) 1016 + } 1017 + t.Logf("✅ Block record exists on PDS") 1018 + 1019 + // CRITICAL: Simulate Jetstream consumer indexing the block 1020 + t.Logf("🔄 Simulating Jetstream consumer indexing block event...") 1021 + blockEvent := jetstream.JetstreamEvent{ 1022 + Did: instanceDID, 1023 + TimeUS: time.Now().UnixMicro(), 1024 + Kind: "commit", 1025 + Commit: &jetstream.CommitEvent{ 1026 + Rev: "test-block-rev", 1027 + Operation: "create", 1028 + Collection: "social.coves.community.block", 1029 + RKey: rkey, 1030 + CID: blockResp.Block.RecordCID, 1031 + Record: map[string]interface{}{ 1032 + "subject": community.DID, 1033 + "createdAt": time.Now().Format(time.RFC3339), 1034 + }, 1035 + }, 1036 + } 1037 + if handleErr := consumer.HandleEvent(context.Background(), &blockEvent); handleErr != nil { 1038 + t.Fatalf("Failed to handle block event: %v", handleErr) 1039 + } 1040 + 1041 + // Verify block was indexed in AppView 1042 + t.Logf("🔍 Verifying block indexed in AppView...") 1043 + block, err := communityRepo.GetBlock(ctx, instanceDID, community.DID) 1044 + if err != nil { 1045 + t.Fatalf("Failed to get block from AppView: %v", err) 1046 + } 1047 + if block.RecordURI != blockResp.Block.RecordURI { 1048 + t.Errorf("RecordURI mismatch: expected %s, got %s", blockResp.Block.RecordURI, block.RecordURI) 1049 + } 1050 + 1051 + t.Logf("✅ TRUE E2E BLOCK FLOW COMPLETE:") 1052 + t.Logf(" Client → XRPC Block → PDS Create → Firehose → Consumer → AppView ✓") 1053 + t.Logf(" ✓ Block record created on PDS") 1054 + t.Logf(" ✓ Block indexed in AppView") 1055 + }) 1056 + 1057 + t.Run("Unblock via XRPC endpoint", func(t *testing.T) { 1058 + // Create a community and block it first 1059 + community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 1060 + 1061 + // Block the community 1062 + t.Logf("🚫 Blocking community first...") 1063 + blockReq := map[string]interface{}{ 1064 + "community": community.DID, 1065 + } 1066 + blockJSON, err := json.Marshal(blockReq) 1067 + if err != nil { 1068 + t.Fatalf("Failed to marshal block request: %v", err) 1069 + } 1070 + 1071 + blockHttpReq, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 1072 + if err != nil { 1073 + t.Fatalf("Failed to create block request: %v", err) 1074 + } 1075 + blockHttpReq.Header.Set("Content-Type", "application/json") 1076 + blockHttpReq.Header.Set("Authorization", "Bearer "+accessToken) 1077 + 1078 + blockResp, err := http.DefaultClient.Do(blockHttpReq) 1079 + if err != nil { 1080 + t.Fatalf("Failed to POST block: %v", err) 1081 + } 1082 + 1083 + var blockRespData struct { 1084 + Block struct { 1085 + RecordURI string `json:"recordUri"` 1086 + } `json:"block"` 1087 + } 1088 + if err := json.NewDecoder(blockResp.Body).Decode(&blockRespData); err != nil { 1089 + func() { _ = blockResp.Body.Close() }() 1090 + t.Fatalf("Failed to decode block response: %v", err) 1091 + } 1092 + func() { _ = blockResp.Body.Close() }() 1093 + 1094 + rkey := "" 1095 + if uriParts := strings.Split(blockRespData.Block.RecordURI, "/"); len(uriParts) >= 4 { 1096 + rkey = uriParts[len(uriParts)-1] 1097 + } 1098 + 1099 + // Index the block via consumer 1100 + blockEvent := jetstream.JetstreamEvent{ 1101 + Did: instanceDID, 1102 + TimeUS: time.Now().UnixMicro(), 1103 + Kind: "commit", 1104 + Commit: &jetstream.CommitEvent{ 1105 + Rev: "test-block-rev", 1106 + Operation: "create", 1107 + Collection: "social.coves.community.block", 1108 + RKey: rkey, 1109 + CID: "test-block-cid", 1110 + Record: map[string]interface{}{ 1111 + "subject": community.DID, 1112 + "createdAt": time.Now().Format(time.RFC3339), 1113 + }, 1114 + }, 1115 + } 1116 + if handleErr := consumer.HandleEvent(context.Background(), &blockEvent); handleErr != nil { 1117 + t.Fatalf("Failed to handle block event: %v", handleErr) 1118 + } 1119 + 1120 + // Now unblock the community 1121 + t.Logf("✅ Unblocking community via XRPC endpoint...") 1122 + unblockReq := map[string]interface{}{ 1123 + "community": community.DID, 1124 + } 1125 + 1126 + unblockJSON, err := json.Marshal(unblockReq) 1127 + if err != nil { 1128 + t.Fatalf("Failed to marshal unblock request: %v", err) 1129 + } 1130 + 1131 + req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.unblockCommunity", bytes.NewBuffer(unblockJSON)) 1132 + if err != nil { 1133 + t.Fatalf("Failed to create unblock request: %v", err) 1134 + } 1135 + req.Header.Set("Content-Type", "application/json") 1136 + req.Header.Set("Authorization", "Bearer "+accessToken) 1137 + 1138 + resp, err := http.DefaultClient.Do(req) 1139 + if err != nil { 1140 + t.Fatalf("Failed to POST unblock: %v", err) 1141 + } 1142 + defer func() { _ = resp.Body.Close() }() 1143 + 1144 + if resp.StatusCode != http.StatusOK { 1145 + body, readErr := io.ReadAll(resp.Body) 1146 + if readErr != nil { 1147 + t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 1148 + } 1149 + t.Logf("❌ XRPC Unblock Failed") 1150 + t.Logf(" Status: %d", resp.StatusCode) 1151 + t.Logf(" Response: %s", string(body)) 1152 + t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 1153 + } 1154 + 1155 + var unblockResp struct { 1156 + Success bool `json:"success"` 1157 + } 1158 + 1159 + if err := json.NewDecoder(resp.Body).Decode(&unblockResp); err != nil { 1160 + t.Fatalf("Failed to decode unblock response: %v", err) 1161 + } 1162 + 1163 + if !unblockResp.Success { 1164 + t.Errorf("Expected success: true, got: %v", unblockResp.Success) 1165 + } 1166 + 1167 + // Verify the block record was deleted from PDS 1168 + t.Logf("🔍 Verifying block record deleted from PDS...") 1169 + collection := "social.coves.community.block" 1170 + pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 1171 + pdsURL, instanceDID, collection, rkey)) 1172 + if pdsErr != nil { 1173 + t.Fatalf("Failed to query PDS: %v", pdsErr) 1174 + } 1175 + defer func() { 1176 + if closeErr := pdsResp.Body.Close(); closeErr != nil { 1177 + t.Logf("Failed to close PDS response: %v", closeErr) 1178 + } 1179 + }() 1180 + 1181 + if pdsResp.StatusCode == http.StatusOK { 1182 + t.Errorf("❌ Block record still exists on PDS (expected 404, got 200)") 1183 + } else { 1184 + t.Logf("✅ Block record successfully deleted from PDS (status: %d)", pdsResp.StatusCode) 1185 + } 1186 + 1187 + // CRITICAL: Simulate Jetstream consumer indexing the DELETE event 1188 + t.Logf("🔄 Simulating Jetstream consumer indexing DELETE event...") 1189 + deleteEvent := jetstream.JetstreamEvent{ 1190 + Did: instanceDID, 1191 + TimeUS: time.Now().UnixMicro(), 1192 + Kind: "commit", 1193 + Commit: &jetstream.CommitEvent{ 1194 + Rev: "test-unblock-rev", 1195 + Operation: "delete", 1196 + Collection: "social.coves.community.block", 1197 + RKey: rkey, 1198 + CID: "", 1199 + Record: nil, 1200 + }, 1201 + } 1202 + if handleErr := consumer.HandleEvent(context.Background(), &deleteEvent); handleErr != nil { 1203 + t.Fatalf("Failed to handle delete event: %v", handleErr) 1204 + } 1205 + 1206 + // Verify block was removed from AppView 1207 + t.Logf("🔍 Verifying block removed from AppView...") 1208 + _, err = communityRepo.GetBlock(ctx, instanceDID, community.DID) 1209 + if err == nil { 1210 + t.Errorf("❌ Block still exists in AppView (should be deleted)") 1211 + } else if !communities.IsNotFound(err) { 1212 + t.Fatalf("Unexpected error querying block: %v", err) 1213 + } else { 1214 + t.Logf("✅ Block removed from AppView") 1215 + } 1216 + 1217 + t.Logf("✅ TRUE E2E UNBLOCK FLOW COMPLETE:") 1218 + t.Logf(" Client → XRPC Unblock → PDS Delete → Firehose → Consumer → AppView ✓") 1219 + t.Logf(" ✓ Block deleted from PDS") 1220 + t.Logf(" ✓ Block removed from AppView") 1221 + }) 1222 + 1223 + t.Run("Block fails without authentication", func(t *testing.T) { 1224 + // Create a community to attempt blocking 1225 + community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 1226 + 1227 + t.Logf("🔒 Attempting to block community without auth token...") 1228 + blockReq := map[string]interface{}{ 1229 + "community": community.DID, 1230 + } 1231 + 1232 + blockJSON, err := json.Marshal(blockReq) 1233 + if err != nil { 1234 + t.Fatalf("Failed to marshal block request: %v", err) 1235 + } 1236 + 1237 + req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 1238 + if err != nil { 1239 + t.Fatalf("Failed to create block request: %v", err) 1240 + } 1241 + req.Header.Set("Content-Type", "application/json") 1242 + // NO Authorization header 1243 + 1244 + resp, err := http.DefaultClient.Do(req) 1245 + if err != nil { 1246 + t.Fatalf("Failed to POST block: %v", err) 1247 + } 1248 + defer func() { _ = resp.Body.Close() }() 1249 + 1250 + // Should fail with 401 Unauthorized 1251 + if resp.StatusCode != http.StatusUnauthorized { 1252 + body, _ := io.ReadAll(resp.Body) 1253 + t.Errorf("Expected 401 Unauthorized, got %d: %s", resp.StatusCode, string(body)) 1254 + } else { 1255 + t.Logf("✅ Block correctly rejected without authentication (401)") 1256 + } 1257 + }) 1258 + 936 1259 t.Run("Update via XRPC endpoint", func(t *testing.T) { 937 1260 // Create a community first (via service, so it's indexed) 938 1261 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) ··· 1009 1332 1010 1333 // Simulate Jetstream consumer picking up the update event 1011 1334 t.Logf("🔄 Simulating Jetstream consumer indexing update...") 1012 - rkey := extractRKeyFromURI(updateResp.URI) 1335 + rkey := utils.ExtractRKeyFromURI(updateResp.URI) 1013 1336 1014 1337 // Fetch updated record from PDS 1015 1338 pdsURL := os.Getenv("PDS_URL") ··· 1136 1459 // Fetch from PDS to get full record 1137 1460 // V2: Record lives in community's own repository (at://community.DID/...) 1138 1461 collection := "social.coves.community.profile" 1139 - rkey := extractRKeyFromURI(community.RecordURI) 1462 + rkey := utils.ExtractRKeyFromURI(community.RecordURI) 1140 1463 1141 1464 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 1142 1465 pdsURL, community.DID, collection, rkey)) ··· 1180 1503 } 1181 1504 1182 1505 return community 1183 - } 1184 - 1185 - func extractRKeyFromURI(uri string) string { 1186 - // at://did/collection/rkey -> rkey 1187 - parts := strings.Split(uri, "/") 1188 - if len(parts) >= 4 { 1189 - return parts[len(parts)-1] 1190 - } 1191 - return "" 1192 1506 } 1193 1507 1194 1508 // authenticateWithPDS authenticates with the PDS and returns access token and DID
+12 -12
tests/integration/subscription_indexing_test.go
··· 46 46 RKey: rkey, 47 47 CID: "bafytest123", 48 48 Record: map[string]interface{}{ 49 - "$type": "social.coves.community.subscription", 50 - "subject": community.DID, 49 + "$type": "social.coves.community.subscription", 50 + "subject": community.DID, 51 51 "createdAt": time.Now().Format(time.RFC3339), 52 52 "contentVisibility": float64(5), // JSON numbers decode as float64 53 53 }, ··· 101 101 RKey: rkey, 102 102 CID: "bafydefault", 103 103 Record: map[string]interface{}{ 104 - "$type": "social.coves.community.subscription", 105 - "subject": community.DID, 104 + "$type": "social.coves.community.subscription", 105 + "subject": community.DID, 106 106 "createdAt": time.Now().Format(time.RFC3339), 107 107 // contentVisibility NOT provided 108 108 }, ··· 130 130 131 131 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) { 132 132 testCases := []struct { 133 + name string 133 134 input float64 134 135 expected int 135 - name string 136 136 }{ 137 137 {input: 0, expected: 1, name: "zero clamped to 1"}, 138 138 {input: -5, expected: 1, name: "negative clamped to 1"}, ··· 201 201 RKey: rkey, 202 202 CID: "bafyidempotent", 203 203 Record: map[string]interface{}{ 204 - "$type": "social.coves.community.subscription", 205 - "subject": community.DID, 204 + "$type": "social.coves.community.subscription", 205 + "subject": community.DID, 206 206 "createdAt": time.Now().Format(time.RFC3339), 207 207 "contentVisibility": float64(4), 208 208 }, ··· 268 268 RKey: rkey, 269 269 CID: "bafycreate", 270 270 Record: map[string]interface{}{ 271 - "$type": "social.coves.community.subscription", 272 - "subject": community.DID, 271 + "$type": "social.coves.community.subscription", 272 + "subject": community.DID, 273 273 "createdAt": time.Now().Format(time.RFC3339), 274 274 "contentVisibility": float64(3), 275 275 }, ··· 298 298 Operation: "delete", 299 299 Collection: "social.coves.community.subscription", 300 300 RKey: rkey, 301 - CID: "", // No CID on deletes 301 + CID: "", // No CID on deletes 302 302 Record: nil, // No record data on deletes 303 303 }, 304 304 } ··· 390 390 RKey: rkey, 391 391 CID: "bafycount", 392 392 Record: map[string]interface{}{ 393 - "$type": "social.coves.community.subscription", 394 - "subject": community.DID, 393 + "$type": "social.coves.community.subscription", 394 + "subject": community.DID, 395 395 "createdAt": time.Now().Format(time.RFC3339), 396 396 "contentVisibility": float64(3), 397 397 },
+24
tests/unit/community_service_test.go
··· 102 102 return nil, nil 103 103 } 104 104 105 + func (m *mockCommunityRepo) BlockCommunity(ctx context.Context, block *communities.CommunityBlock) (*communities.CommunityBlock, error) { 106 + return block, nil 107 + } 108 + 109 + func (m *mockCommunityRepo) UnblockCommunity(ctx context.Context, userDID, communityDID string) error { 110 + return nil 111 + } 112 + 113 + func (m *mockCommunityRepo) GetBlock(ctx context.Context, userDID, communityDID string) (*communities.CommunityBlock, error) { 114 + return nil, communities.ErrBlockNotFound 115 + } 116 + 117 + func (m *mockCommunityRepo) GetBlockByURI(ctx context.Context, recordURI string) (*communities.CommunityBlock, error) { 118 + return nil, communities.ErrBlockNotFound 119 + } 120 + 121 + func (m *mockCommunityRepo) ListBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*communities.CommunityBlock, error) { 122 + return nil, nil 123 + } 124 + 125 + func (m *mockCommunityRepo) IsBlocked(ctx context.Context, userDID, communityDID string) (bool, error) { 126 + return false, nil 127 + } 128 + 105 129 func (m *mockCommunityRepo) CreateMembership(ctx context.Context, membership *communities.Membership) (*communities.Membership, error) { 106 130 return membership, nil 107 131 }