A community based topic aggregation platform built on atproto
1package communities
2
3import (
4 oauthclient "Coves/internal/atproto/oauth"
5 "Coves/internal/atproto/pds"
6 "Coves/internal/atproto/utils"
7 "Coves/internal/core/blobs"
8 "bytes"
9 "context"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "io"
14 "log"
15 "net/http"
16 "regexp"
17 "strings"
18 "sync"
19 "time"
20
21 "github.com/bluesky-social/indigo/atproto/auth/oauth"
22 "github.com/bluesky-social/indigo/atproto/syntax"
23)
24
25// Community handle validation regex (DNS-valid handle: name.community.instance.com)
26// Matches standard DNS hostname format (RFC 1035)
27var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
28
29// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen)
30var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
31
32// Domain validation (simplified - checks for valid DNS hostname structure)
33var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
34
35// PDSClientFactory creates PDS clients from session data.
36// Used to allow injection of different auth mechanisms (OAuth for production, password for tests).
37type PDSClientFactory func(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error)
38
39type communityService struct {
40 // Interfaces and pointers first (better alignment)
41 repo Repository
42 provisioner *PDSAccountProvisioner
43 blobService blobs.Service
44
45 // OAuth client for user PDS authentication (DPoP-based)
46 oauthClient *oauthclient.OAuthClient
47 pdsClientFactory PDSClientFactory // Optional, for testing. If nil, uses OAuth.
48
49 // Token refresh concurrency control
50 // Each community gets its own mutex to prevent concurrent refresh attempts
51 refreshMutexes map[string]*sync.Mutex
52
53 // Strings
54 pdsURL string
55 instanceDID string
56 instanceDomain string
57 pdsAccessToken string
58
59 // Sync primitives last
60 mapMutex sync.RWMutex // Protects refreshMutexes map itself
61}
62
63const (
64 // Maximum recommended size for mutex cache (warning threshold, not hard limit)
65 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead)
66 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable
67 maxMutexCacheSize = 10000
68)
69
70// NewCommunityService creates a new community service with OAuth client for user authentication
71func NewCommunityService(
72 repo Repository,
73 pdsURL, instanceDID, instanceDomain string,
74 provisioner *PDSAccountProvisioner,
75 oauthClient *oauthclient.OAuthClient,
76 blobService blobs.Service,
77) Service {
78 // SECURITY: Basic validation that did:web domain matches configured instanceDomain
79 // This catches honest configuration mistakes but NOT malicious code modifications
80 // Full verification (Phase 2) requires fetching DID document from domain
81 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification"
82 if strings.HasPrefix(instanceDID, "did:web:") {
83 didDomain := strings.TrimPrefix(instanceDID, "did:web:")
84 if didDomain != instanceDomain {
85 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)",
86 didDomain, instanceDomain)
87 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt")
88 log.Printf(" Communities will be hosted by: %s", instanceDID)
89 }
90 }
91
92 return &communityService{
93 repo: repo,
94 pdsURL: pdsURL,
95 instanceDID: instanceDID,
96 instanceDomain: instanceDomain,
97 provisioner: provisioner,
98 oauthClient: oauthClient,
99 blobService: blobService,
100 refreshMutexes: make(map[string]*sync.Mutex),
101 }
102}
103
104// NewCommunityServiceWithPDSFactory creates a community service with a custom PDS client factory.
105// This is primarily for testing with password-based authentication.
106func NewCommunityServiceWithPDSFactory(
107 repo Repository,
108 pdsURL, instanceDID, instanceDomain string,
109 provisioner *PDSAccountProvisioner,
110 factory PDSClientFactory,
111 blobService blobs.Service,
112) Service {
113 return &communityService{
114 repo: repo,
115 pdsURL: pdsURL,
116 instanceDID: instanceDID,
117 instanceDomain: instanceDomain,
118 provisioner: provisioner,
119 pdsClientFactory: factory,
120 blobService: blobService,
121 refreshMutexes: make(map[string]*sync.Mutex),
122 }
123}
124
125// SetPDSAccessToken sets the PDS access token for authentication
126// This should be called after creating a session for the Coves instance DID on the PDS
127func (s *communityService) SetPDSAccessToken(token string) {
128 s.pdsAccessToken = token
129}
130
131// getPDSClient creates a PDS client from an OAuth session.
132// If a custom factory was provided (for testing), uses that.
133// Otherwise, uses DPoP authentication via indigo's APIClient for proper OAuth token handling.
134func (s *communityService) getPDSClient(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) {
135 // Use custom factory if provided (e.g., for testing with password auth)
136 if s.pdsClientFactory != nil {
137 return s.pdsClientFactory(ctx, session)
138 }
139
140 // Production path: use OAuth with DPoP
141 if s.oauthClient == nil || s.oauthClient.ClientApp == nil {
142 log.Printf("[OAUTH_ERROR] getPDSClient called but OAuth client is not configured - check server initialization")
143 return nil, fmt.Errorf("OAuth client not configured")
144 }
145
146 client, err := pds.NewFromOAuthSession(ctx, s.oauthClient.ClientApp, session)
147 if err != nil {
148 return nil, fmt.Errorf("failed to create PDS client: %w", err)
149 }
150
151 return client, nil
152}
153
154// CreateCommunity creates a new community via write-forward to PDS
155// V2 Flow:
156// 1. Service creates PDS account for community (PDS generates signing keypair)
157// 2. Service writes community profile to COMMUNITY's own repository
158// 3. Firehose emits event
159// 4. Consumer indexes to AppView DB
160//
161// V2 Architecture:
162// - Community owns its own repository (at://community_did/social.coves.community.profile/self)
163// - PDS manages the signing keypair (we never see it)
164// - We store PDS credentials to act on behalf of the community
165// - Community can migrate to other instances (future V2.1 with rotation keys)
166func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) {
167 // Apply defaults before validation
168 if req.Visibility == "" {
169 req.Visibility = "public"
170 }
171
172 // SECURITY: Auto-populate hostedByDID from instance configuration
173 // Clients MUST NOT provide this field - it's derived from the instance receiving the request
174 // This prevents malicious instances from claiming to host communities for domains they don't own
175 req.HostedByDID = s.instanceDID
176
177 // Validate request
178 if err := s.validateCreateRequest(req); err != nil {
179 return nil, err
180 }
181
182 // V2: Provision a real PDS account for this community
183 // This calls com.atproto.server.createAccount internally
184 // The PDS will:
185 // 1. Generate a signing keypair (stored in PDS, we never see it)
186 // 2. Create a DID (did:plc:xxx)
187 // 3. Return credentials (DID, tokens)
188 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name)
189 if err != nil {
190 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err)
191 }
192
193 // Validate the atProto handle
194 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil {
195 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr)
196 }
197
198 // Build community profile record
199 profile := map[string]interface{}{
200 "$type": "social.coves.community.profile",
201 "name": req.Name, // Short name for !mentions (e.g., "gaming")
202 "visibility": req.Visibility,
203 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns
204 "createdBy": req.CreatedByDID,
205 "createdAt": time.Now().Format(time.RFC3339),
206 "federation": map[string]interface{}{
207 "allowExternalDiscovery": req.AllowExternalDiscovery,
208 },
209 }
210
211 // Add optional fields
212 if req.DisplayName != "" {
213 profile["displayName"] = req.DisplayName
214 }
215 if req.Description != "" {
216 profile["description"] = req.Description
217 }
218 if len(req.Rules) > 0 {
219 profile["rules"] = req.Rules
220 }
221 if len(req.Categories) > 0 {
222 profile["categories"] = req.Categories
223 }
224 if req.Language != "" {
225 profile["language"] = req.Language
226 }
227
228 // Track blob CIDs for storage in the community record
229 var avatarCID, bannerCID string
230
231 // Upload avatar if provided
232 if len(req.AvatarBlob) > 0 {
233 if req.AvatarMimeType == "" {
234 return nil, fmt.Errorf("avatarMimeType is required when avatarBlob is provided")
235 }
236 if s.blobService == nil {
237 return nil, fmt.Errorf("blob service not configured, cannot upload avatar")
238 }
239 avatarRef, err := s.blobService.UploadBlob(ctx, pdsAccount, req.AvatarBlob, req.AvatarMimeType)
240 if err != nil {
241 return nil, fmt.Errorf("failed to upload avatar: %w", err)
242 }
243 profile["avatar"] = map[string]interface{}{
244 "$type": avatarRef.Type,
245 "ref": avatarRef.Ref,
246 "mimeType": avatarRef.MimeType,
247 "size": avatarRef.Size,
248 }
249 // Extract CID for database storage
250 avatarCID = avatarRef.Ref["$link"]
251 }
252
253 // Upload banner if provided
254 if len(req.BannerBlob) > 0 {
255 if req.BannerMimeType == "" {
256 return nil, fmt.Errorf("bannerMimeType is required when bannerBlob is provided")
257 }
258 if s.blobService == nil {
259 return nil, fmt.Errorf("blob service not configured, cannot upload banner")
260 }
261 bannerRef, err := s.blobService.UploadBlob(ctx, pdsAccount, req.BannerBlob, req.BannerMimeType)
262 if err != nil {
263 return nil, fmt.Errorf("failed to upload banner: %w", err)
264 }
265 profile["banner"] = map[string]interface{}{
266 "$type": bannerRef.Type,
267 "ref": bannerRef.Ref,
268 "mimeType": bannerRef.MimeType,
269 "size": bannerRef.Size,
270 }
271 // Extract CID for database storage
272 bannerCID = bannerRef.Ref["$link"]
273 }
274
275 // V2: Write to COMMUNITY's own repository (not instance repo!)
276 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
277 // Authenticate using community's access token
278 recordURI, recordCID, err := s.createRecordOnPDSAs(
279 ctx,
280 pdsAccount.DID, // repo = community's DID (community owns its repo!)
281 "social.coves.community.profile",
282 "self", // canonical rkey for profile
283 profile,
284 pdsAccount.AccessToken, // authenticate as the community
285 )
286 if err != nil {
287 return nil, fmt.Errorf("failed to create community profile record: %w", err)
288 }
289
290 // Build Community object with PDS credentials AND cryptographic keys
291 community := &Community{
292 DID: pdsAccount.DID, // Community's DID (owns the repo!)
293 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social)
294 Name: req.Name,
295 DisplayName: req.DisplayName,
296 Description: req.Description,
297 OwnerDID: pdsAccount.DID, // V2: Community owns itself
298 CreatedByDID: req.CreatedByDID,
299 HostedByDID: req.HostedByDID,
300 PDSEmail: pdsAccount.Email,
301 PDSPassword: pdsAccount.Password,
302 PDSAccessToken: pdsAccount.AccessToken,
303 PDSRefreshToken: pdsAccount.RefreshToken,
304 PDSURL: pdsAccount.PDSURL,
305 Visibility: req.Visibility,
306 AllowExternalDiscovery: req.AllowExternalDiscovery,
307 AvatarCID: avatarCID,
308 BannerCID: bannerCID,
309 MemberCount: 0,
310 SubscriberCount: 0,
311 CreatedAt: time.Now(),
312 UpdatedAt: time.Now(),
313 RecordURI: recordURI,
314 RecordCID: recordCID,
315 // V2: Cryptographic keys for portability (will be encrypted by repository)
316 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration
317 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations
318 }
319
320 // CRITICAL: Persist PDS credentials immediately to database
321 // The Jetstream consumer will eventually index the community profile from the firehose,
322 // but it won't have the PDS credentials. We must store them now so we can:
323 // 1. Update the community profile later (using its own credentials)
324 // 2. Re-authenticate if access tokens expire
325 _, err = s.repo.Create(ctx, community)
326 if err != nil {
327 return nil, fmt.Errorf("failed to persist community with credentials: %w", err)
328 }
329
330 return community, nil
331}
332
333// GetCommunity retrieves a community from AppView DB
334// identifier can be:
335// - DID: did:plc:xxx
336// - Scoped handle: !name@instance
337// - At-identifier: @c-name.domain
338// - Canonical handle: c-name.domain
339func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) {
340 originalIdentifier := identifier
341 identifier = strings.TrimSpace(identifier)
342
343 if identifier == "" {
344 return nil, ErrInvalidInput
345 }
346
347 // 1. DID format
348 if strings.HasPrefix(identifier, "did:") {
349 community, err := s.repo.GetByDID(ctx, identifier)
350 if err != nil {
351 return nil, fmt.Errorf("community not found for identifier %q: %w", originalIdentifier, err)
352 }
353 return community, nil
354 }
355
356 // 2. Scoped format: !name@instance
357 if strings.HasPrefix(identifier, "!") {
358 // Resolve scoped identifier to DID, then fetch
359 did, err := s.resolveScopedIdentifier(ctx, identifier)
360 if err != nil {
361 return nil, fmt.Errorf("failed to resolve identifier %q: %w", originalIdentifier, err)
362 }
363 community, err := s.repo.GetByDID(ctx, did)
364 if err != nil {
365 return nil, fmt.Errorf("community not found for identifier %q: %w", originalIdentifier, err)
366 }
367 return community, nil
368 }
369
370 // 3. At-identifier format: @handle (strip @ prefix)
371 identifier = strings.TrimPrefix(identifier, "@")
372
373 // 4. Canonical handle format: c-name.domain
374 if strings.Contains(identifier, ".") {
375 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier))
376 if err != nil {
377 return nil, fmt.Errorf("community not found for identifier %q: %w", originalIdentifier, err)
378 }
379 return community, nil
380 }
381
382 return nil, NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)")
383}
384
385// GetByDID retrieves a community by its DID
386// Exported for use by post service when validating community references
387func (s *communityService) GetByDID(ctx context.Context, did string) (*Community, error) {
388 if did == "" {
389 return nil, ErrInvalidInput
390 }
391
392 if !strings.HasPrefix(did, "did:") {
393 return nil, NewValidationError("did", "must be a valid DID")
394 }
395
396 return s.repo.GetByDID(ctx, did)
397}
398
399// UpdateCommunity updates a community via write-forward to PDS
400func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) {
401 if req.CommunityDID == "" {
402 return nil, NewValidationError("communityDid", "required")
403 }
404
405 if req.UpdatedByDID == "" {
406 return nil, NewValidationError("updatedByDid", "required")
407 }
408
409 // Get existing community
410 existing, err := s.repo.GetByDID(ctx, req.CommunityDID)
411 if err != nil {
412 return nil, err
413 }
414
415 // Authorization: verify user is the creator BEFORE any expensive operations
416 // This prevents unauthorized users from uploading orphaned blobs (DoS vector)
417 // TODO(Communities-Auth): Add moderator check when moderation system is implemented
418 if existing.CreatedByDID != req.UpdatedByDID {
419 return nil, ErrUnauthorized
420 }
421
422 // CRITICAL: Ensure fresh PDS access token before write operation
423 // Community PDS tokens expire every ~2 hours and must be refreshed
424 existing, err = s.EnsureFreshToken(ctx, existing)
425 if err != nil {
426 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err)
427 }
428
429 // Upload avatar if provided
430 var avatarRef *blobs.BlobRef
431 if len(req.AvatarBlob) > 0 {
432 if req.AvatarMimeType == "" {
433 return nil, fmt.Errorf("avatarMimeType is required when avatarBlob is provided")
434 }
435 if s.blobService == nil {
436 return nil, fmt.Errorf("blob service not configured, cannot upload avatar")
437 }
438 ref, err := s.blobService.UploadBlob(ctx, existing, req.AvatarBlob, req.AvatarMimeType)
439 if err != nil {
440 return nil, fmt.Errorf("failed to upload avatar: %w", err)
441 }
442 avatarRef = ref
443 }
444
445 // Upload banner if provided
446 var bannerRef *blobs.BlobRef
447 if len(req.BannerBlob) > 0 {
448 if req.BannerMimeType == "" {
449 return nil, fmt.Errorf("bannerMimeType is required when bannerBlob is provided")
450 }
451 if s.blobService == nil {
452 return nil, fmt.Errorf("blob service not configured, cannot upload banner")
453 }
454 ref, err := s.blobService.UploadBlob(ctx, existing, req.BannerBlob, req.BannerMimeType)
455 if err != nil {
456 return nil, fmt.Errorf("failed to upload banner: %w", err)
457 }
458 bannerRef = ref
459 }
460
461 // Build updated profile record (start with existing)
462 profile := map[string]interface{}{
463 "$type": "social.coves.community.profile",
464 "name": existing.Name,
465 "owner": existing.OwnerDID,
466 "createdBy": existing.CreatedByDID,
467 "hostedBy": existing.HostedByDID,
468 "createdAt": existing.CreatedAt.Format(time.RFC3339),
469 }
470
471 // Apply updates
472 if req.DisplayName != nil {
473 profile["displayName"] = *req.DisplayName
474 } else {
475 profile["displayName"] = existing.DisplayName
476 }
477
478 if req.Description != nil {
479 profile["description"] = *req.Description
480 } else {
481 profile["description"] = existing.Description
482 }
483
484 if req.Visibility != nil {
485 profile["visibility"] = *req.Visibility
486 } else {
487 profile["visibility"] = existing.Visibility
488 }
489
490 if req.AllowExternalDiscovery != nil {
491 profile["federation"] = map[string]interface{}{
492 "allowExternalDiscovery": *req.AllowExternalDiscovery,
493 }
494 } else {
495 profile["federation"] = map[string]interface{}{
496 "allowExternalDiscovery": existing.AllowExternalDiscovery,
497 }
498 }
499
500 // Preserve moderation settings (even if empty)
501 // These fields are optional but should not be erased on update
502 if req.ModerationType != nil {
503 profile["moderationType"] = *req.ModerationType
504 } else if existing.ModerationType != "" {
505 profile["moderationType"] = existing.ModerationType
506 }
507
508 if len(req.ContentWarnings) > 0 {
509 profile["contentWarnings"] = req.ContentWarnings
510 } else if len(existing.ContentWarnings) > 0 {
511 profile["contentWarnings"] = existing.ContentWarnings
512 }
513
514 // Add blob references if uploaded
515 if avatarRef != nil {
516 profile["avatar"] = map[string]interface{}{
517 "$type": avatarRef.Type,
518 "ref": avatarRef.Ref,
519 "mimeType": avatarRef.MimeType,
520 "size": avatarRef.Size,
521 }
522 }
523
524 if bannerRef != nil {
525 profile["banner"] = map[string]interface{}{
526 "$type": bannerRef.Type,
527 "ref": bannerRef.Ref,
528 "mimeType": bannerRef.MimeType,
529 "size": bannerRef.Size,
530 }
531 }
532
533 // V2: Community profiles always use "self" as rkey
534 // (No need to extract from URI - it's always "self" for V2 communities)
535
536 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials
537 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
538 // Authenticate as the community (not as instance!)
539 if existing.PDSAccessToken == "" {
540 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID)
541 }
542
543 recordURI, recordCID, err := s.putRecordOnPDSAs(
544 ctx,
545 existing.DID, // repo = community's own DID (V2!)
546 "social.coves.community.profile",
547 "self", // V2: always "self"
548 profile,
549 existing.PDSAccessToken, // authenticate as the community
550 )
551 if err != nil {
552 return nil, fmt.Errorf("failed to update community on PDS: %w", err)
553 }
554
555 // Return updated community representation
556 // Actual AppView DB update happens via Jetstream consumer
557 updated := *existing
558 if req.DisplayName != nil {
559 updated.DisplayName = *req.DisplayName
560 }
561 if req.Description != nil {
562 updated.Description = *req.Description
563 }
564 if req.Visibility != nil {
565 updated.Visibility = *req.Visibility
566 }
567 if req.AllowExternalDiscovery != nil {
568 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery
569 }
570 if req.ModerationType != nil {
571 updated.ModerationType = *req.ModerationType
572 }
573 if len(req.ContentWarnings) > 0 {
574 updated.ContentWarnings = req.ContentWarnings
575 }
576 updated.RecordURI = recordURI
577 updated.RecordCID = recordCID
578 updated.UpdatedAt = time.Now()
579
580 return &updated, nil
581}
582
583// getOrCreateRefreshMutex returns a mutex for the given community DID
584// Thread-safe with read-lock fast path for existing entries
585// SAFETY: Does NOT evict entries to avoid race condition where:
586// 1. Thread A holds mutex for community-123
587// 2. Thread B evicts community-123 from map
588// 3. Thread C creates NEW mutex for community-123
589// 4. Now two threads can refresh community-123 concurrently (mutex defeated!)
590func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex {
591 // Fast path: check if mutex already exists (read lock)
592 s.mapMutex.RLock()
593 mutex, exists := s.refreshMutexes[did]
594 s.mapMutex.RUnlock()
595
596 if exists {
597 return mutex
598 }
599
600 // Slow path: create new mutex (write lock)
601 s.mapMutex.Lock()
602 defer s.mapMutex.Unlock()
603
604 // Double-check after acquiring write lock (another goroutine might have created it)
605 mutex, exists = s.refreshMutexes[did]
606 if exists {
607 return mutex
608 }
609
610 // Create new mutex
611 mutex = &sync.Mutex{}
612 s.refreshMutexes[did] = mutex
613
614 // SAFETY: No eviction to prevent race condition
615 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes
616 if len(s.refreshMutexes) > maxMutexCacheSize {
617 memoryKB := len(s.refreshMutexes) * 16 / 1024
618 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB",
619 len(s.refreshMutexes), maxMutexCacheSize, memoryKB)
620 }
621
622 return mutex
623}
624
625// ensureFreshToken checks if a community's access token needs refresh and updates if needed
626// Returns updated community with fresh credentials (or original if no refresh needed)
627// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts
628// EnsureFreshToken ensures the community's PDS access token is valid
629// Exported for use by post service when writing posts to community repos
630func (s *communityService) EnsureFreshToken(ctx context.Context, community *Community) (*Community, error) {
631 // Get or create mutex for this specific community DID
632 mutex := s.getOrCreateRefreshMutex(community.DID)
633
634 // Lock for this specific community (allows other communities to refresh concurrently)
635 mutex.Lock()
636 defer mutex.Unlock()
637
638 // Re-fetch community from DB (another goroutine might have already refreshed it)
639 fresh, err := s.repo.GetByDID(ctx, community.DID)
640 if err != nil {
641 return nil, fmt.Errorf("failed to re-fetch community: %w", err)
642 }
643
644 // Check if token needs refresh (5-minute buffer before expiration)
645 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken)
646 if err != nil {
647 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err)
648 return nil, fmt.Errorf("failed to check token expiration: %w", err)
649 }
650
651 if !needsRefresh {
652 // Token still valid, no refresh needed
653 return fresh, nil
654 }
655
656 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID)
657
658 // Attempt token refresh using refresh token
659 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSRefreshToken)
660 if err != nil {
661 // Check if refresh token expired (need password fallback)
662 // Match both "ExpiredToken" and "Token has expired" error messages
663 if strings.Contains(strings.ToLower(err.Error()), "expired") {
664 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID)
665
666 // Fallback: Re-authenticate with stored password
667 newAccessToken, newRefreshToken, err = reauthenticateWithPassword(
668 ctx,
669 fresh.PDSURL,
670 fresh.PDSEmail,
671 fresh.PDSPassword, // Retrieved decrypted from DB
672 )
673 if err != nil {
674 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err)
675 return nil, fmt.Errorf("failed to re-authenticate community: %w", err)
676 }
677
678 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID)
679 } else {
680 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err)
681 return nil, fmt.Errorf("failed to refresh token: %w", err)
682 }
683 }
684
685 // CRITICAL: Update database with new tokens immediately
686 // Refresh tokens are SINGLE-USE - old one is now invalid
687 // Use retry logic to handle transient DB failures
688 const maxRetries = 3
689 var updateErr error
690 for attempt := 0; attempt < maxRetries; attempt++ {
691 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken)
692 if updateErr == nil {
693 break // Success
694 }
695
696 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v",
697 fresh.DID, attempt+1, maxRetries, updateErr)
698
699 if attempt < maxRetries-1 {
700 // Exponential backoff: 100ms, 200ms, 400ms
701 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
702 time.Sleep(backoff)
703 }
704 }
705
706 if updateErr != nil {
707 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved
708 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v",
709 fresh.DID, maxRetries, updateErr)
710 // TODO: Send alert to monitoring system (add in Beta)
711 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w",
712 maxRetries, updateErr)
713 }
714
715 // Return updated community object with fresh tokens
716 updatedCommunity := *fresh
717 updatedCommunity.PDSAccessToken = newAccessToken
718 updatedCommunity.PDSRefreshToken = newRefreshToken
719
720 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID)
721
722 return &updatedCommunity, nil
723}
724
725// ListCommunities queries AppView DB for communities with filters
726func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, error) {
727 // Set defaults
728 if req.Limit <= 0 || req.Limit > 100 {
729 req.Limit = 50
730 }
731
732 return s.repo.List(ctx, req)
733}
734
735// SearchCommunities performs fuzzy search in AppView DB
736func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) {
737 if req.Query == "" {
738 return nil, 0, NewValidationError("query", "search query is required")
739 }
740
741 // Set defaults
742 if req.Limit <= 0 || req.Limit > 100 {
743 req.Limit = 50
744 }
745
746 return s.repo.Search(ctx, req)
747}
748
749// SubscribeToCommunity creates a subscription via write-forward to PDS
750// Uses OAuth session with DPoP authentication for secure PDS communication
751func (s *communityService) SubscribeToCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*Subscription, error) {
752 if session == nil {
753 return nil, NewValidationError("session", "required")
754 }
755
756 userDID := session.AccountDID.String()
757
758 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid
759 if contentVisibility <= 0 || contentVisibility > 5 {
760 contentVisibility = 3
761 }
762
763 // Resolve community identifier to DID
764 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
765 if err != nil {
766 return nil, fmt.Errorf("subscribe: %w", err)
767 }
768
769 // Verify community exists
770 community, err := s.repo.GetByDID(ctx, communityDID)
771 if err != nil {
772 return nil, err
773 }
774
775 // Check visibility - can't subscribe to private communities without invitation (TODO)
776 if community.Visibility == "private" {
777 return nil, ErrUnauthorized
778 }
779
780 // Create PDS client for this session (DPoP authentication)
781 pdsClient, err := s.getPDSClient(ctx, session)
782 if err != nil {
783 return nil, fmt.Errorf("failed to create PDS client: %w", err)
784 }
785
786 // Generate TID for record key
787 tid := syntax.NewTIDNow(0)
788
789 // Build subscription record
790 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure)
791 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid}
792 // Following atProto conventions, we use "subject" field to reference the community
793 subRecord := map[string]interface{}{
794 "$type": "social.coves.community.subscription",
795 "subject": communityDID, // atProto convention: "subject" for entity references
796 "createdAt": time.Now().Format(time.RFC3339),
797 "contentVisibility": contentVisibility,
798 }
799
800 // Write-forward: create subscription record in user's repo using DPoP-authenticated client
801 recordURI, recordCID, err := pdsClient.CreateRecord(ctx, "social.coves.community.subscription", tid.String(), subRecord)
802 if err != nil {
803 if pds.IsAuthError(err) {
804 return nil, ErrUnauthorized
805 }
806 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
807 }
808
809 // Return subscription representation
810 subscription := &Subscription{
811 UserDID: userDID,
812 CommunityDID: communityDID,
813 ContentVisibility: contentVisibility,
814 SubscribedAt: time.Now(),
815 RecordURI: recordURI,
816 RecordCID: recordCID,
817 }
818
819 return subscription, nil
820}
821
822// UnsubscribeFromCommunity removes a subscription via PDS delete
823// Uses OAuth session with DPoP authentication for secure PDS communication
824func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error {
825 if session == nil {
826 return NewValidationError("session", "required")
827 }
828
829 userDID := session.AccountDID.String()
830
831 // Resolve community identifier
832 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
833 if err != nil {
834 return fmt.Errorf("unsubscribe: %w", err)
835 }
836
837 // Get the subscription from AppView to find the record key
838 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID)
839 if err != nil {
840 return err
841 }
842
843 // Extract rkey from record URI (at://did/collection/rkey)
844 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI)
845 if rkey == "" {
846 return fmt.Errorf("invalid subscription record URI")
847 }
848
849 // Create PDS client for this session (DPoP authentication)
850 pdsClient, err := s.getPDSClient(ctx, session)
851 if err != nil {
852 return fmt.Errorf("failed to create PDS client: %w", err)
853 }
854
855 // Write-forward: delete record from PDS using DPoP-authenticated client
856 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe
857 if err := pdsClient.DeleteRecord(ctx, "social.coves.community.subscription", rkey); err != nil {
858 if pds.IsAuthError(err) {
859 return ErrUnauthorized
860 }
861 return fmt.Errorf("failed to delete subscription on PDS: %w", err)
862 }
863
864 return nil
865}
866
867// GetUserSubscriptions queries AppView DB for user's subscriptions
868func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) {
869 if limit <= 0 || limit > 100 {
870 limit = 50
871 }
872
873 return s.repo.ListSubscriptions(ctx, userDID, limit, offset)
874}
875
876// GetCommunitySubscribers queries AppView DB for community subscribers
877func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) {
878 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
879 if err != nil {
880 return nil, err
881 }
882
883 if limit <= 0 || limit > 100 {
884 limit = 50
885 }
886
887 return s.repo.ListSubscribers(ctx, communityDID, limit, offset)
888}
889
890// GetMembership retrieves membership info from AppView DB
891func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) {
892 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
893 if err != nil {
894 return nil, err
895 }
896
897 return s.repo.GetMembership(ctx, userDID, communityDID)
898}
899
900// ListCommunityMembers queries AppView DB for members
901func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) {
902 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
903 if err != nil {
904 return nil, err
905 }
906
907 if limit <= 0 || limit > 100 {
908 limit = 50
909 }
910
911 return s.repo.ListMembers(ctx, communityDID, limit, offset)
912}
913
914// BlockCommunity blocks a community via write-forward to PDS
915// Uses OAuth session with DPoP authentication for secure PDS communication
916func (s *communityService) BlockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) (*CommunityBlock, error) {
917 if session == nil {
918 return nil, NewValidationError("session", "required")
919 }
920
921 userDID := session.AccountDID.String()
922
923 // Resolve community identifier (also verifies community exists)
924 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
925 if err != nil {
926 return nil, fmt.Errorf("block: %w", err)
927 }
928
929 // Create PDS client for this session (DPoP authentication)
930 pdsClient, err := s.getPDSClient(ctx, session)
931 if err != nil {
932 return nil, fmt.Errorf("block: failed to create PDS client: %w", err)
933 }
934
935 // Generate TID for record key
936 tid := syntax.NewTIDNow(0)
937
938 // Build block record
939 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE)
940 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid}
941 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern
942 blockRecord := map[string]interface{}{
943 "$type": "social.coves.community.block",
944 "subject": communityDID, // DID of community being blocked
945 "createdAt": time.Now().Format(time.RFC3339),
946 }
947
948 // Write-forward: create block record in user's repo using DPoP-authenticated client
949 // Note: We don't check for existing blocks first because:
950 // 1. The PDS may reject duplicates (depending on implementation)
951 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING
952 // 3. This avoids a race condition where two concurrent requests both pass the check
953 recordURI, recordCID, err := pdsClient.CreateRecord(ctx, "social.coves.community.block", tid.String(), blockRecord)
954 if err != nil {
955 // Check for auth errors first
956 if pds.IsAuthError(err) {
957 return nil, ErrUnauthorized
958 }
959
960 // Check if this is a duplicate/conflict error from PDS
961 if pds.IsConflictError(err) {
962 // Fetch and return existing block from our indexed view
963 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID)
964 if getErr == nil {
965 // Block exists in our index - return it
966 return existingBlock, nil
967 }
968 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition)
969 // Any other error (DB outage, connection failure, etc.) should bubble up
970 if errors.Is(getErr, ErrBlockNotFound) {
971 // Race condition: PDS has the block but Jetstream hasn't indexed it yet
972 // Return typed conflict error so handler can return 409 instead of 500
973 // This is normal in eventually-consistent systems
974 return nil, ErrBlockAlreadyExists
975 }
976 // Real datastore error - bubble it up so operators see the failure
977 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr)
978 }
979 return nil, fmt.Errorf("failed to create block on PDS: %w", err)
980 }
981
982 // Return block representation
983 block := &CommunityBlock{
984 UserDID: userDID,
985 CommunityDID: communityDID,
986 BlockedAt: time.Now(),
987 RecordURI: recordURI,
988 RecordCID: recordCID,
989 }
990
991 return block, nil
992}
993
994// UnblockCommunity removes a block via PDS delete
995// Uses OAuth session with DPoP authentication for secure PDS communication
996func (s *communityService) UnblockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error {
997 if session == nil {
998 return NewValidationError("session", "required")
999 }
1000
1001 userDID := session.AccountDID.String()
1002
1003 // Resolve community identifier
1004 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
1005 if err != nil {
1006 return fmt.Errorf("unblock: %w", err)
1007 }
1008
1009 // Get the block from AppView to find the record key
1010 block, err := s.repo.GetBlock(ctx, userDID, communityDID)
1011 if err != nil {
1012 return err
1013 }
1014
1015 // Extract rkey from record URI (at://did/collection/rkey)
1016 rkey := utils.ExtractRKeyFromURI(block.RecordURI)
1017 if rkey == "" {
1018 return fmt.Errorf("invalid block record URI")
1019 }
1020
1021 // Create PDS client for this session (DPoP authentication)
1022 pdsClient, err := s.getPDSClient(ctx, session)
1023 if err != nil {
1024 return fmt.Errorf("failed to create PDS client: %w", err)
1025 }
1026
1027 // Write-forward: delete record from PDS using DPoP-authenticated client
1028 if err := pdsClient.DeleteRecord(ctx, "social.coves.community.block", rkey); err != nil {
1029 if pds.IsAuthError(err) {
1030 return ErrUnauthorized
1031 }
1032 return fmt.Errorf("failed to delete block on PDS: %w", err)
1033 }
1034
1035 return nil
1036}
1037
1038// GetBlockedCommunities queries AppView DB for user's blocks
1039func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) {
1040 if limit <= 0 || limit > 100 {
1041 limit = 50
1042 }
1043
1044 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset)
1045}
1046
1047// IsBlocked checks if a user has blocked a community
1048func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) {
1049 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
1050 if err != nil {
1051 return false, err
1052 }
1053
1054 return s.repo.IsBlocked(ctx, userDID, communityDID)
1055}
1056
1057// ValidateHandle checks if a community handle is valid
1058func (s *communityService) ValidateHandle(handle string) error {
1059 if handle == "" {
1060 return NewValidationError("handle", "required")
1061 }
1062
1063 if !communityHandleRegex.MatchString(handle) {
1064 return ErrInvalidHandle
1065 }
1066
1067 return nil
1068}
1069
1070// ResolveCommunityIdentifier converts a community identifier to a DID
1071// Following Bluesky's pattern with Coves extensions:
1072//
1073// Accepts (like Bluesky's at-identifier):
1074// 1. DID: did:plc:abc123 (pass through)
1075// 2. Canonical handle: gardening.community.coves.social (atProto standard)
1076// 3. At-identifier: @gardening.community.coves.social (strip @ prefix)
1077//
1078// Coves-specific extensions:
1079// 4. Scoped format: !gardening@coves.social (parse and resolve)
1080//
1081// Returns: DID string
1082func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) {
1083 identifier = strings.TrimSpace(identifier)
1084
1085 if identifier == "" {
1086 return "", ErrInvalidInput
1087 }
1088
1089 // 1. DID - verify it exists and return (Bluesky standard)
1090 if strings.HasPrefix(identifier, "did:") {
1091 _, err := s.repo.GetByDID(ctx, identifier)
1092 if err != nil {
1093 if IsNotFound(err) {
1094 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err)
1095 }
1096 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err)
1097 }
1098 return identifier, nil
1099 }
1100
1101 // 2. Scoped format: !name@instance (Coves-specific)
1102 if strings.HasPrefix(identifier, "!") {
1103 return s.resolveScopedIdentifier(ctx, identifier)
1104 }
1105
1106 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix)
1107 identifier = strings.TrimPrefix(identifier, "@")
1108
1109 // 4. Canonical handle: name.community.instance.com (Bluesky standard)
1110 if strings.Contains(identifier, ".") {
1111 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier))
1112 if err != nil {
1113 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err)
1114 }
1115 return community.DID, nil
1116 }
1117
1118 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)")
1119}
1120
1121// resolveScopedIdentifier handles Coves-specific !name@instance format
1122// Formats accepted:
1123//
1124// !gardening@coves.social -> c-gardening.coves.social
1125func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) {
1126 // Remove ! prefix
1127 scoped = strings.TrimPrefix(scoped, "!")
1128
1129 var name string
1130 var instanceDomain string
1131
1132 // Parse !name@instance
1133 if !strings.Contains(scoped, "@") {
1134 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)")
1135 }
1136
1137 parts := strings.SplitN(scoped, "@", 2)
1138 name = strings.TrimSpace(parts[0])
1139 instanceDomain = strings.TrimSpace(parts[1])
1140
1141 // Validate name format
1142 if name == "" {
1143 return "", NewValidationError("identifier", "community name cannot be empty")
1144 }
1145
1146 // Validate name is a valid DNS label (RFC 1035)
1147 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen
1148 if !isValidDNSLabel(name) {
1149 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)")
1150 }
1151
1152 // Validate instance domain format
1153 if !isValidDomain(instanceDomain) {
1154 return "", NewValidationError("identifier", "invalid instance domain format")
1155 }
1156
1157 // Normalize domain to lowercase (DNS is case-insensitive)
1158 // This fixes the bug where !gardening@Coves.social would fail lookup
1159 instanceDomain = strings.ToLower(instanceDomain)
1160
1161 // Validate the instance matches this server
1162 if !s.isLocalInstance(instanceDomain) {
1163 return "", NewValidationError("identifier",
1164 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain))
1165 }
1166
1167 // Construct canonical handle: c-{name}.{instanceDomain}
1168 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup
1169 canonicalHandle := fmt.Sprintf("c-%s.%s",
1170 strings.ToLower(name),
1171 instanceDomain) // Already normalized to lowercase above
1172
1173 // Look up by canonical handle
1174 community, err := s.repo.GetByHandle(ctx, canonicalHandle)
1175 if err != nil {
1176 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err)
1177 }
1178
1179 return community.DID, nil
1180}
1181
1182// isLocalInstance checks if the provided domain matches this instance
1183func (s *communityService) isLocalInstance(domain string) bool {
1184 // Normalize both domains
1185 domain = strings.ToLower(strings.TrimSpace(domain))
1186 instanceDomain := strings.ToLower(s.instanceDomain)
1187
1188 // Direct match
1189 return domain == instanceDomain
1190}
1191
1192// Validation helpers
1193
1194// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035
1195// - 1-63 characters
1196// - Alphanumeric and hyphens only
1197// - Cannot start or end with hyphen
1198func isValidDNSLabel(label string) bool {
1199 return dnsLabelRegex.MatchString(label)
1200}
1201
1202// isValidDomain validates that a string is a valid domain name
1203// Simplified validation - checks basic DNS hostname structure
1204func isValidDomain(domain string) bool {
1205 if domain == "" || len(domain) > 253 {
1206 return false
1207 }
1208 return domainRegex.MatchString(domain)
1209}
1210
1211func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error {
1212 if req.Name == "" {
1213 return NewValidationError("name", "required")
1214 }
1215
1216 // DNS label limit: 63 characters per label
1217 // Community handle format: {name}.community.{instanceDomain}
1218 // The first label is just req.Name, so it must be <= 63 chars
1219 if len(req.Name) > 63 {
1220 return NewValidationError("name", "must be 63 characters or less (DNS label limit)")
1221 }
1222
1223 // Name can only contain alphanumeric and hyphens
1224 // Must start and end with alphanumeric (not hyphen)
1225 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
1226 if !nameRegex.MatchString(req.Name) {
1227 return NewValidationError("name", "must contain only alphanumeric characters and hyphens")
1228 }
1229
1230 if req.Description != "" && len(req.Description) > 3000 {
1231 return NewValidationError("description", "must be 3000 characters or less")
1232 }
1233
1234 // Visibility should already be set with default in CreateCommunity
1235 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" {
1236 return ErrInvalidVisibility
1237 }
1238
1239 if req.CreatedByDID == "" {
1240 return NewValidationError("createdByDid", "required")
1241 }
1242
1243 // hostedByDID is auto-populated by the service layer, no validation needed
1244 // The handler ensures clients cannot provide this field
1245
1246 return nil
1247}
1248
1249// PDS write-forward helpers
1250
1251// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth)
1252func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1253 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
1254
1255 payload := map[string]interface{}{
1256 "repo": repoDID,
1257 "collection": collection,
1258 "record": record,
1259 }
1260
1261 if rkey != "" {
1262 payload["rkey"] = rkey
1263 }
1264
1265 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1266}
1267
1268// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth)
1269func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1270 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/"))
1271
1272 payload := map[string]interface{}{
1273 "repo": repoDID,
1274 "collection": collection,
1275 "rkey": rkey,
1276 "record": record,
1277 }
1278
1279 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1280}
1281
1282// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication)
1283func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
1284 jsonData, err := json.Marshal(payload)
1285 if err != nil {
1286 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
1287 }
1288
1289 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
1290 if err != nil {
1291 return "", "", fmt.Errorf("failed to create request: %w", err)
1292 }
1293 req.Header.Set("Content-Type", "application/json")
1294
1295 // Add authentication with provided access token
1296 if accessToken != "" {
1297 req.Header.Set("Authorization", "Bearer "+accessToken)
1298 }
1299
1300 // Dynamic timeout based on operation type
1301 // Write operations (createAccount, createRecord, putRecord) are slower due to:
1302 // - Keypair generation
1303 // - DID PLC registration
1304 // - Database writes on PDS
1305 timeout := 10 * time.Second // Default for read operations
1306 if strings.Contains(endpoint, "createAccount") ||
1307 strings.Contains(endpoint, "createRecord") ||
1308 strings.Contains(endpoint, "putRecord") {
1309 timeout = 30 * time.Second // Extended timeout for write operations
1310 }
1311
1312 client := &http.Client{Timeout: timeout}
1313 resp, err := client.Do(req)
1314 if err != nil {
1315 return "", "", fmt.Errorf("failed to call PDS: %w", err)
1316 }
1317 defer func() {
1318 if closeErr := resp.Body.Close(); closeErr != nil {
1319 log.Printf("Failed to close response body: %v", closeErr)
1320 }
1321 }()
1322
1323 body, err := io.ReadAll(resp.Body)
1324 if err != nil {
1325 return "", "", fmt.Errorf("failed to read response: %w", err)
1326 }
1327
1328 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
1329 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
1330 }
1331
1332 // Parse response to extract URI and CID
1333 var result struct {
1334 URI string `json:"uri"`
1335 CID string `json:"cid"`
1336 }
1337 if err := json.Unmarshal(body, &result); err != nil {
1338 // For delete operations, there might not be a response body
1339 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
1340 return "", "", nil
1341 }
1342 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
1343 }
1344
1345 return result.URI, result.CID, nil
1346}
1347
1348// Helper functions