Clone this repository
For self-hosted knots, clone URLs may differ based on your setup.
Download tar.gz
Integrate aggregator components into server initialization and
register XRPC endpoints.
Changes to cmd/server/main.go:
- Initialize aggregator repository
- Initialize aggregator service (with community service dependency)
- Update post service to include aggregator service
- Register aggregator XRPC routes (3 query endpoints)
- Start aggregator Jetstream consumer in background goroutine
- Add comprehensive startup logging
Server startup output:
✅ Aggregator service initialized
Started Jetstream aggregator consumer: ws://localhost:6008/subscribe?...
- Indexing: social.coves.aggregator.service (service declarations)
- Indexing: social.coves.aggregator.authorization (authorization records)
Aggregator XRPC endpoints registered (query endpoints public)
Architecture:
- Aggregator service depends on: aggregator repo, community service
- Post service depends on: aggregator service (for auth checks)
- Jetstream consumer runs independently, indexes to DB via repository
- XRPC handlers call service layer methods
Phase 1 complete:
✅ Aggregators can authenticate (via JWT)
✅ Aggregators can post to authorized communities
✅ Rate limiting enforced (10 posts/hour per community)
✅ Query endpoints available for discovery
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implement Jetstream consumer to index aggregator service declarations
and authorization records from the firehose in real-time.
aggregator_consumer.go:
- Handles social.coves.aggregator.service records (create/update/delete)
- Handles social.coves.aggregator.authorization records (create/update/delete)
- Upsert logic for both create and update operations
- Delete by URI for authorization cleanup
- Validation:
* Service rkey must be "self" (canonical location)
* communityDid in authorization must match repo DID (prevents forgery)
* did in service must match repo DID (prevents DID spoofing)
* Required fields validation
- Avatar blob extraction from atProto blob ref
- createdAt parsing from RFC3339 with fallback
aggregator_jetstream_connector.go:
- WebSocket connection management with auto-reconnect
- Ping/pong keepalive
- Graceful error handling (continues on parsing errors)
- Filters for wanted collections
Jetstream URL:
ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization
Indexed to database:
- aggregators table (stats auto-updated via triggers)
- aggregator_authorizations table (unique constraint on aggregator+community)
Security:
- DID validation prevents impersonation
- communityDid validation prevents authorization forgery
- Graceful error handling prevents consumer crashes
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implement XRPC query endpoints for aggregator discovery and
authorization management.
Handlers (internal/api/handlers/aggregator/):
- get_services.go: Fetch aggregator details by DIDs
* Supports detailed=true for stats (communities_using, posts_created)
* Returns aggregatorView or aggregatorViewDetailed union type
* Bulk query optimization for multiple DIDs
- get_authorizations.go: List communities using an aggregator
* Nested aggregatorView in response (lexicon compliance)
* Supports enabledOnly filter and pagination
- list_for_community.go: List aggregators for a community
* Accepts at-identifier (DID or handle) for community
* Returns authorizationView with config
* Supports enabledOnly filter and pagination
- errors.go: Error handling with domain error mapping
* Maps domain errors to appropriate HTTP status codes
* 404 for not found, 400 for validation, 501 for not implemented
Routes (internal/api/routes/aggregator.go):
- GET /xrpc/social.coves.aggregator.getServices?dids=...
- GET /xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=...
- GET /xrpc/social.coves.aggregator.listForCommunity?community=...
Features:
- Query parameter parsing with validation
- Domain model to API view conversion
- JSON response formatting matching lexicon
- Proper HTTP status codes (404, 400, 500, 501)
- Config unmarshal from JSONB to interface{}
Deferred to Phase 2:
- Write endpoints (enable, disable, updateConfig) return 501 Not Implemented
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Modify post creation flow to support aggregator posting with
server-side validation and rate limiting.
Changes to internal/core/posts/service.go:
- Server-side aggregator detection via database query
- Dual validation flow:
* Aggregators: Authorization + rate limits, skip membership checks
* Users: Existing visibility/membership validation
- Post tracking after successful creation for rate limiting
- Clear logging to distinguish aggregator vs user posts
Changes to internal/core/posts/errors.go:
- Added ErrRateLimitExceeded for aggregator rate limiting
Changes to internal/api/handlers/post/errors.go:
- Map both aggregators.ErrRateLimitExceeded and posts.ErrRateLimitExceeded to 429
Security:
- DID extracted from JWT (cryptographically verified)
- Database lookup confirms aggregator status (no client-provided flag)
- Authorization checked against indexed records from firehose
- Rate limiting: 10 posts/hour per community per aggregator
Flow:
1. Extract DID from JWT (verified by auth middleware)
2. Query: Is this DID an aggregator? (database lookup)
3a. If aggregator: Check authorization + rate limits
3b. If user: Check community membership/visibility
4. Write post to PDS
5. If aggregator: Record post for rate limiting
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implement complete repository layer for aggregator data access with
optimized queries and bulk operations.
Domain models (internal/core/aggregators/):
- aggregator.go: Aggregator and Authorization domain types
- interfaces.go: Repository and Service interfaces
- errors.go: Domain-specific errors with IsXxx helpers
Repository (internal/db/postgres/aggregator_repo.go):
- CRUD operations for aggregators and authorizations
- Fast IsAggregator() check using EXISTS query
- Fast IsAuthorized() check with optimized partial index
- Bulk GetAggregatorsByDIDs() for efficient multi-DID queries
- Post tracking for rate limiting
- Upsert logic with ON CONFLICT for Jetstream indexing
- Delete by URI for Jetstream delete operations
Performance:
- Uses idx_aggregator_auth_lookup for <5ms authorization checks
- Uses idx_aggregator_posts_rate_limit for fast rate limit queries
- Parameterized queries throughout (no SQL injection risk)
- Bulk operations reduce N+1 query problems
Dependencies:
- Added gojsonschema for config validation
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>