A community based topic aggregation platform built on atproto
at main 1348 lines 48 kB view raw
1package communities 2 3import ( 4 oauthclient "Coves/internal/atproto/oauth" 5 "Coves/internal/atproto/pds" 6 "Coves/internal/atproto/utils" 7 "Coves/internal/core/blobs" 8 "bytes" 9 "context" 10 "encoding/json" 11 "errors" 12 "fmt" 13 "io" 14 "log" 15 "net/http" 16 "regexp" 17 "strings" 18 "sync" 19 "time" 20 21 "github.com/bluesky-social/indigo/atproto/auth/oauth" 22 "github.com/bluesky-social/indigo/atproto/syntax" 23) 24 25// Community handle validation regex (DNS-valid handle: name.community.instance.com) 26// Matches standard DNS hostname format (RFC 1035) 27var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 28 29// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen) 30var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 31 32// Domain validation (simplified - checks for valid DNS hostname structure) 33var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 34 35// PDSClientFactory creates PDS clients from session data. 36// Used to allow injection of different auth mechanisms (OAuth for production, password for tests). 37type PDSClientFactory func(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) 38 39type communityService struct { 40 // Interfaces and pointers first (better alignment) 41 repo Repository 42 provisioner *PDSAccountProvisioner 43 blobService blobs.Service 44 45 // OAuth client for user PDS authentication (DPoP-based) 46 oauthClient *oauthclient.OAuthClient 47 pdsClientFactory PDSClientFactory // Optional, for testing. If nil, uses OAuth. 48 49 // Token refresh concurrency control 50 // Each community gets its own mutex to prevent concurrent refresh attempts 51 refreshMutexes map[string]*sync.Mutex 52 53 // Strings 54 pdsURL string 55 instanceDID string 56 instanceDomain string 57 pdsAccessToken string 58 59 // Sync primitives last 60 mapMutex sync.RWMutex // Protects refreshMutexes map itself 61} 62 63const ( 64 // Maximum recommended size for mutex cache (warning threshold, not hard limit) 65 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead) 66 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable 67 maxMutexCacheSize = 10000 68) 69 70// NewCommunityService creates a new community service with OAuth client for user authentication 71func NewCommunityService( 72 repo Repository, 73 pdsURL, instanceDID, instanceDomain string, 74 provisioner *PDSAccountProvisioner, 75 oauthClient *oauthclient.OAuthClient, 76 blobService blobs.Service, 77) Service { 78 // SECURITY: Basic validation that did:web domain matches configured instanceDomain 79 // This catches honest configuration mistakes but NOT malicious code modifications 80 // Full verification (Phase 2) requires fetching DID document from domain 81 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification" 82 if strings.HasPrefix(instanceDID, "did:web:") { 83 didDomain := strings.TrimPrefix(instanceDID, "did:web:") 84 if didDomain != instanceDomain { 85 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)", 86 didDomain, instanceDomain) 87 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt") 88 log.Printf(" Communities will be hosted by: %s", instanceDID) 89 } 90 } 91 92 return &communityService{ 93 repo: repo, 94 pdsURL: pdsURL, 95 instanceDID: instanceDID, 96 instanceDomain: instanceDomain, 97 provisioner: provisioner, 98 oauthClient: oauthClient, 99 blobService: blobService, 100 refreshMutexes: make(map[string]*sync.Mutex), 101 } 102} 103 104// NewCommunityServiceWithPDSFactory creates a community service with a custom PDS client factory. 105// This is primarily for testing with password-based authentication. 106func NewCommunityServiceWithPDSFactory( 107 repo Repository, 108 pdsURL, instanceDID, instanceDomain string, 109 provisioner *PDSAccountProvisioner, 110 factory PDSClientFactory, 111 blobService blobs.Service, 112) Service { 113 return &communityService{ 114 repo: repo, 115 pdsURL: pdsURL, 116 instanceDID: instanceDID, 117 instanceDomain: instanceDomain, 118 provisioner: provisioner, 119 pdsClientFactory: factory, 120 blobService: blobService, 121 refreshMutexes: make(map[string]*sync.Mutex), 122 } 123} 124 125// SetPDSAccessToken sets the PDS access token for authentication 126// This should be called after creating a session for the Coves instance DID on the PDS 127func (s *communityService) SetPDSAccessToken(token string) { 128 s.pdsAccessToken = token 129} 130 131// getPDSClient creates a PDS client from an OAuth session. 132// If a custom factory was provided (for testing), uses that. 133// Otherwise, uses DPoP authentication via indigo's APIClient for proper OAuth token handling. 134func (s *communityService) getPDSClient(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) { 135 // Use custom factory if provided (e.g., for testing with password auth) 136 if s.pdsClientFactory != nil { 137 return s.pdsClientFactory(ctx, session) 138 } 139 140 // Production path: use OAuth with DPoP 141 if s.oauthClient == nil || s.oauthClient.ClientApp == nil { 142 log.Printf("[OAUTH_ERROR] getPDSClient called but OAuth client is not configured - check server initialization") 143 return nil, fmt.Errorf("OAuth client not configured") 144 } 145 146 client, err := pds.NewFromOAuthSession(ctx, s.oauthClient.ClientApp, session) 147 if err != nil { 148 return nil, fmt.Errorf("failed to create PDS client: %w", err) 149 } 150 151 return client, nil 152} 153 154// CreateCommunity creates a new community via write-forward to PDS 155// V2 Flow: 156// 1. Service creates PDS account for community (PDS generates signing keypair) 157// 2. Service writes community profile to COMMUNITY's own repository 158// 3. Firehose emits event 159// 4. Consumer indexes to AppView DB 160// 161// V2 Architecture: 162// - Community owns its own repository (at://community_did/social.coves.community.profile/self) 163// - PDS manages the signing keypair (we never see it) 164// - We store PDS credentials to act on behalf of the community 165// - Community can migrate to other instances (future V2.1 with rotation keys) 166func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) { 167 // Apply defaults before validation 168 if req.Visibility == "" { 169 req.Visibility = "public" 170 } 171 172 // SECURITY: Auto-populate hostedByDID from instance configuration 173 // Clients MUST NOT provide this field - it's derived from the instance receiving the request 174 // This prevents malicious instances from claiming to host communities for domains they don't own 175 req.HostedByDID = s.instanceDID 176 177 // Validate request 178 if err := s.validateCreateRequest(req); err != nil { 179 return nil, err 180 } 181 182 // V2: Provision a real PDS account for this community 183 // This calls com.atproto.server.createAccount internally 184 // The PDS will: 185 // 1. Generate a signing keypair (stored in PDS, we never see it) 186 // 2. Create a DID (did:plc:xxx) 187 // 3. Return credentials (DID, tokens) 188 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name) 189 if err != nil { 190 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err) 191 } 192 193 // Validate the atProto handle 194 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil { 195 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr) 196 } 197 198 // Build community profile record 199 profile := map[string]interface{}{ 200 "$type": "social.coves.community.profile", 201 "name": req.Name, // Short name for !mentions (e.g., "gaming") 202 "visibility": req.Visibility, 203 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns 204 "createdBy": req.CreatedByDID, 205 "createdAt": time.Now().Format(time.RFC3339), 206 "federation": map[string]interface{}{ 207 "allowExternalDiscovery": req.AllowExternalDiscovery, 208 }, 209 } 210 211 // Add optional fields 212 if req.DisplayName != "" { 213 profile["displayName"] = req.DisplayName 214 } 215 if req.Description != "" { 216 profile["description"] = req.Description 217 } 218 if len(req.Rules) > 0 { 219 profile["rules"] = req.Rules 220 } 221 if len(req.Categories) > 0 { 222 profile["categories"] = req.Categories 223 } 224 if req.Language != "" { 225 profile["language"] = req.Language 226 } 227 228 // Track blob CIDs for storage in the community record 229 var avatarCID, bannerCID string 230 231 // Upload avatar if provided 232 if len(req.AvatarBlob) > 0 { 233 if req.AvatarMimeType == "" { 234 return nil, fmt.Errorf("avatarMimeType is required when avatarBlob is provided") 235 } 236 if s.blobService == nil { 237 return nil, fmt.Errorf("blob service not configured, cannot upload avatar") 238 } 239 avatarRef, err := s.blobService.UploadBlob(ctx, pdsAccount, req.AvatarBlob, req.AvatarMimeType) 240 if err != nil { 241 return nil, fmt.Errorf("failed to upload avatar: %w", err) 242 } 243 profile["avatar"] = map[string]interface{}{ 244 "$type": avatarRef.Type, 245 "ref": avatarRef.Ref, 246 "mimeType": avatarRef.MimeType, 247 "size": avatarRef.Size, 248 } 249 // Extract CID for database storage 250 avatarCID = avatarRef.Ref["$link"] 251 } 252 253 // Upload banner if provided 254 if len(req.BannerBlob) > 0 { 255 if req.BannerMimeType == "" { 256 return nil, fmt.Errorf("bannerMimeType is required when bannerBlob is provided") 257 } 258 if s.blobService == nil { 259 return nil, fmt.Errorf("blob service not configured, cannot upload banner") 260 } 261 bannerRef, err := s.blobService.UploadBlob(ctx, pdsAccount, req.BannerBlob, req.BannerMimeType) 262 if err != nil { 263 return nil, fmt.Errorf("failed to upload banner: %w", err) 264 } 265 profile["banner"] = map[string]interface{}{ 266 "$type": bannerRef.Type, 267 "ref": bannerRef.Ref, 268 "mimeType": bannerRef.MimeType, 269 "size": bannerRef.Size, 270 } 271 // Extract CID for database storage 272 bannerCID = bannerRef.Ref["$link"] 273 } 274 275 // V2: Write to COMMUNITY's own repository (not instance repo!) 276 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 277 // Authenticate using community's access token 278 recordURI, recordCID, err := s.createRecordOnPDSAs( 279 ctx, 280 pdsAccount.DID, // repo = community's DID (community owns its repo!) 281 "social.coves.community.profile", 282 "self", // canonical rkey for profile 283 profile, 284 pdsAccount.AccessToken, // authenticate as the community 285 ) 286 if err != nil { 287 return nil, fmt.Errorf("failed to create community profile record: %w", err) 288 } 289 290 // Build Community object with PDS credentials AND cryptographic keys 291 community := &Community{ 292 DID: pdsAccount.DID, // Community's DID (owns the repo!) 293 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social) 294 Name: req.Name, 295 DisplayName: req.DisplayName, 296 Description: req.Description, 297 OwnerDID: pdsAccount.DID, // V2: Community owns itself 298 CreatedByDID: req.CreatedByDID, 299 HostedByDID: req.HostedByDID, 300 PDSEmail: pdsAccount.Email, 301 PDSPassword: pdsAccount.Password, 302 PDSAccessToken: pdsAccount.AccessToken, 303 PDSRefreshToken: pdsAccount.RefreshToken, 304 PDSURL: pdsAccount.PDSURL, 305 Visibility: req.Visibility, 306 AllowExternalDiscovery: req.AllowExternalDiscovery, 307 AvatarCID: avatarCID, 308 BannerCID: bannerCID, 309 MemberCount: 0, 310 SubscriberCount: 0, 311 CreatedAt: time.Now(), 312 UpdatedAt: time.Now(), 313 RecordURI: recordURI, 314 RecordCID: recordCID, 315 // V2: Cryptographic keys for portability (will be encrypted by repository) 316 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration 317 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations 318 } 319 320 // CRITICAL: Persist PDS credentials immediately to database 321 // The Jetstream consumer will eventually index the community profile from the firehose, 322 // but it won't have the PDS credentials. We must store them now so we can: 323 // 1. Update the community profile later (using its own credentials) 324 // 2. Re-authenticate if access tokens expire 325 _, err = s.repo.Create(ctx, community) 326 if err != nil { 327 return nil, fmt.Errorf("failed to persist community with credentials: %w", err) 328 } 329 330 return community, nil 331} 332 333// GetCommunity retrieves a community from AppView DB 334// identifier can be: 335// - DID: did:plc:xxx 336// - Scoped handle: !name@instance 337// - At-identifier: @c-name.domain 338// - Canonical handle: c-name.domain 339func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) { 340 originalIdentifier := identifier 341 identifier = strings.TrimSpace(identifier) 342 343 if identifier == "" { 344 return nil, ErrInvalidInput 345 } 346 347 // 1. DID format 348 if strings.HasPrefix(identifier, "did:") { 349 community, err := s.repo.GetByDID(ctx, identifier) 350 if err != nil { 351 return nil, fmt.Errorf("community not found for identifier %q: %w", originalIdentifier, err) 352 } 353 return community, nil 354 } 355 356 // 2. Scoped format: !name@instance 357 if strings.HasPrefix(identifier, "!") { 358 // Resolve scoped identifier to DID, then fetch 359 did, err := s.resolveScopedIdentifier(ctx, identifier) 360 if err != nil { 361 return nil, fmt.Errorf("failed to resolve identifier %q: %w", originalIdentifier, err) 362 } 363 community, err := s.repo.GetByDID(ctx, did) 364 if err != nil { 365 return nil, fmt.Errorf("community not found for identifier %q: %w", originalIdentifier, err) 366 } 367 return community, nil 368 } 369 370 // 3. At-identifier format: @handle (strip @ prefix) 371 identifier = strings.TrimPrefix(identifier, "@") 372 373 // 4. Canonical handle format: c-name.domain 374 if strings.Contains(identifier, ".") { 375 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier)) 376 if err != nil { 377 return nil, fmt.Errorf("community not found for identifier %q: %w", originalIdentifier, err) 378 } 379 return community, nil 380 } 381 382 return nil, NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)") 383} 384 385// GetByDID retrieves a community by its DID 386// Exported for use by post service when validating community references 387func (s *communityService) GetByDID(ctx context.Context, did string) (*Community, error) { 388 if did == "" { 389 return nil, ErrInvalidInput 390 } 391 392 if !strings.HasPrefix(did, "did:") { 393 return nil, NewValidationError("did", "must be a valid DID") 394 } 395 396 return s.repo.GetByDID(ctx, did) 397} 398 399// UpdateCommunity updates a community via write-forward to PDS 400func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) { 401 if req.CommunityDID == "" { 402 return nil, NewValidationError("communityDid", "required") 403 } 404 405 if req.UpdatedByDID == "" { 406 return nil, NewValidationError("updatedByDid", "required") 407 } 408 409 // Get existing community 410 existing, err := s.repo.GetByDID(ctx, req.CommunityDID) 411 if err != nil { 412 return nil, err 413 } 414 415 // Authorization: verify user is the creator BEFORE any expensive operations 416 // This prevents unauthorized users from uploading orphaned blobs (DoS vector) 417 // TODO(Communities-Auth): Add moderator check when moderation system is implemented 418 if existing.CreatedByDID != req.UpdatedByDID { 419 return nil, ErrUnauthorized 420 } 421 422 // CRITICAL: Ensure fresh PDS access token before write operation 423 // Community PDS tokens expire every ~2 hours and must be refreshed 424 existing, err = s.EnsureFreshToken(ctx, existing) 425 if err != nil { 426 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err) 427 } 428 429 // Upload avatar if provided 430 var avatarRef *blobs.BlobRef 431 if len(req.AvatarBlob) > 0 { 432 if req.AvatarMimeType == "" { 433 return nil, fmt.Errorf("avatarMimeType is required when avatarBlob is provided") 434 } 435 if s.blobService == nil { 436 return nil, fmt.Errorf("blob service not configured, cannot upload avatar") 437 } 438 ref, err := s.blobService.UploadBlob(ctx, existing, req.AvatarBlob, req.AvatarMimeType) 439 if err != nil { 440 return nil, fmt.Errorf("failed to upload avatar: %w", err) 441 } 442 avatarRef = ref 443 } 444 445 // Upload banner if provided 446 var bannerRef *blobs.BlobRef 447 if len(req.BannerBlob) > 0 { 448 if req.BannerMimeType == "" { 449 return nil, fmt.Errorf("bannerMimeType is required when bannerBlob is provided") 450 } 451 if s.blobService == nil { 452 return nil, fmt.Errorf("blob service not configured, cannot upload banner") 453 } 454 ref, err := s.blobService.UploadBlob(ctx, existing, req.BannerBlob, req.BannerMimeType) 455 if err != nil { 456 return nil, fmt.Errorf("failed to upload banner: %w", err) 457 } 458 bannerRef = ref 459 } 460 461 // Build updated profile record (start with existing) 462 profile := map[string]interface{}{ 463 "$type": "social.coves.community.profile", 464 "name": existing.Name, 465 "owner": existing.OwnerDID, 466 "createdBy": existing.CreatedByDID, 467 "hostedBy": existing.HostedByDID, 468 "createdAt": existing.CreatedAt.Format(time.RFC3339), 469 } 470 471 // Apply updates 472 if req.DisplayName != nil { 473 profile["displayName"] = *req.DisplayName 474 } else { 475 profile["displayName"] = existing.DisplayName 476 } 477 478 if req.Description != nil { 479 profile["description"] = *req.Description 480 } else { 481 profile["description"] = existing.Description 482 } 483 484 if req.Visibility != nil { 485 profile["visibility"] = *req.Visibility 486 } else { 487 profile["visibility"] = existing.Visibility 488 } 489 490 if req.AllowExternalDiscovery != nil { 491 profile["federation"] = map[string]interface{}{ 492 "allowExternalDiscovery": *req.AllowExternalDiscovery, 493 } 494 } else { 495 profile["federation"] = map[string]interface{}{ 496 "allowExternalDiscovery": existing.AllowExternalDiscovery, 497 } 498 } 499 500 // Preserve moderation settings (even if empty) 501 // These fields are optional but should not be erased on update 502 if req.ModerationType != nil { 503 profile["moderationType"] = *req.ModerationType 504 } else if existing.ModerationType != "" { 505 profile["moderationType"] = existing.ModerationType 506 } 507 508 if len(req.ContentWarnings) > 0 { 509 profile["contentWarnings"] = req.ContentWarnings 510 } else if len(existing.ContentWarnings) > 0 { 511 profile["contentWarnings"] = existing.ContentWarnings 512 } 513 514 // Add blob references if uploaded 515 if avatarRef != nil { 516 profile["avatar"] = map[string]interface{}{ 517 "$type": avatarRef.Type, 518 "ref": avatarRef.Ref, 519 "mimeType": avatarRef.MimeType, 520 "size": avatarRef.Size, 521 } 522 } 523 524 if bannerRef != nil { 525 profile["banner"] = map[string]interface{}{ 526 "$type": bannerRef.Type, 527 "ref": bannerRef.Ref, 528 "mimeType": bannerRef.MimeType, 529 "size": bannerRef.Size, 530 } 531 } 532 533 // V2: Community profiles always use "self" as rkey 534 // (No need to extract from URI - it's always "self" for V2 communities) 535 536 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials 537 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 538 // Authenticate as the community (not as instance!) 539 if existing.PDSAccessToken == "" { 540 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID) 541 } 542 543 recordURI, recordCID, err := s.putRecordOnPDSAs( 544 ctx, 545 existing.DID, // repo = community's own DID (V2!) 546 "social.coves.community.profile", 547 "self", // V2: always "self" 548 profile, 549 existing.PDSAccessToken, // authenticate as the community 550 ) 551 if err != nil { 552 return nil, fmt.Errorf("failed to update community on PDS: %w", err) 553 } 554 555 // Return updated community representation 556 // Actual AppView DB update happens via Jetstream consumer 557 updated := *existing 558 if req.DisplayName != nil { 559 updated.DisplayName = *req.DisplayName 560 } 561 if req.Description != nil { 562 updated.Description = *req.Description 563 } 564 if req.Visibility != nil { 565 updated.Visibility = *req.Visibility 566 } 567 if req.AllowExternalDiscovery != nil { 568 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery 569 } 570 if req.ModerationType != nil { 571 updated.ModerationType = *req.ModerationType 572 } 573 if len(req.ContentWarnings) > 0 { 574 updated.ContentWarnings = req.ContentWarnings 575 } 576 updated.RecordURI = recordURI 577 updated.RecordCID = recordCID 578 updated.UpdatedAt = time.Now() 579 580 return &updated, nil 581} 582 583// getOrCreateRefreshMutex returns a mutex for the given community DID 584// Thread-safe with read-lock fast path for existing entries 585// SAFETY: Does NOT evict entries to avoid race condition where: 586// 1. Thread A holds mutex for community-123 587// 2. Thread B evicts community-123 from map 588// 3. Thread C creates NEW mutex for community-123 589// 4. Now two threads can refresh community-123 concurrently (mutex defeated!) 590func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex { 591 // Fast path: check if mutex already exists (read lock) 592 s.mapMutex.RLock() 593 mutex, exists := s.refreshMutexes[did] 594 s.mapMutex.RUnlock() 595 596 if exists { 597 return mutex 598 } 599 600 // Slow path: create new mutex (write lock) 601 s.mapMutex.Lock() 602 defer s.mapMutex.Unlock() 603 604 // Double-check after acquiring write lock (another goroutine might have created it) 605 mutex, exists = s.refreshMutexes[did] 606 if exists { 607 return mutex 608 } 609 610 // Create new mutex 611 mutex = &sync.Mutex{} 612 s.refreshMutexes[did] = mutex 613 614 // SAFETY: No eviction to prevent race condition 615 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes 616 if len(s.refreshMutexes) > maxMutexCacheSize { 617 memoryKB := len(s.refreshMutexes) * 16 / 1024 618 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB", 619 len(s.refreshMutexes), maxMutexCacheSize, memoryKB) 620 } 621 622 return mutex 623} 624 625// ensureFreshToken checks if a community's access token needs refresh and updates if needed 626// Returns updated community with fresh credentials (or original if no refresh needed) 627// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts 628// EnsureFreshToken ensures the community's PDS access token is valid 629// Exported for use by post service when writing posts to community repos 630func (s *communityService) EnsureFreshToken(ctx context.Context, community *Community) (*Community, error) { 631 // Get or create mutex for this specific community DID 632 mutex := s.getOrCreateRefreshMutex(community.DID) 633 634 // Lock for this specific community (allows other communities to refresh concurrently) 635 mutex.Lock() 636 defer mutex.Unlock() 637 638 // Re-fetch community from DB (another goroutine might have already refreshed it) 639 fresh, err := s.repo.GetByDID(ctx, community.DID) 640 if err != nil { 641 return nil, fmt.Errorf("failed to re-fetch community: %w", err) 642 } 643 644 // Check if token needs refresh (5-minute buffer before expiration) 645 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken) 646 if err != nil { 647 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err) 648 return nil, fmt.Errorf("failed to check token expiration: %w", err) 649 } 650 651 if !needsRefresh { 652 // Token still valid, no refresh needed 653 return fresh, nil 654 } 655 656 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID) 657 658 // Attempt token refresh using refresh token 659 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSRefreshToken) 660 if err != nil { 661 // Check if refresh token expired (need password fallback) 662 // Match both "ExpiredToken" and "Token has expired" error messages 663 if strings.Contains(strings.ToLower(err.Error()), "expired") { 664 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID) 665 666 // Fallback: Re-authenticate with stored password 667 newAccessToken, newRefreshToken, err = reauthenticateWithPassword( 668 ctx, 669 fresh.PDSURL, 670 fresh.PDSEmail, 671 fresh.PDSPassword, // Retrieved decrypted from DB 672 ) 673 if err != nil { 674 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err) 675 return nil, fmt.Errorf("failed to re-authenticate community: %w", err) 676 } 677 678 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID) 679 } else { 680 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err) 681 return nil, fmt.Errorf("failed to refresh token: %w", err) 682 } 683 } 684 685 // CRITICAL: Update database with new tokens immediately 686 // Refresh tokens are SINGLE-USE - old one is now invalid 687 // Use retry logic to handle transient DB failures 688 const maxRetries = 3 689 var updateErr error 690 for attempt := 0; attempt < maxRetries; attempt++ { 691 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken) 692 if updateErr == nil { 693 break // Success 694 } 695 696 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v", 697 fresh.DID, attempt+1, maxRetries, updateErr) 698 699 if attempt < maxRetries-1 { 700 // Exponential backoff: 100ms, 200ms, 400ms 701 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond 702 time.Sleep(backoff) 703 } 704 } 705 706 if updateErr != nil { 707 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved 708 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v", 709 fresh.DID, maxRetries, updateErr) 710 // TODO: Send alert to monitoring system (add in Beta) 711 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w", 712 maxRetries, updateErr) 713 } 714 715 // Return updated community object with fresh tokens 716 updatedCommunity := *fresh 717 updatedCommunity.PDSAccessToken = newAccessToken 718 updatedCommunity.PDSRefreshToken = newRefreshToken 719 720 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID) 721 722 return &updatedCommunity, nil 723} 724 725// ListCommunities queries AppView DB for communities with filters 726func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, error) { 727 // Set defaults 728 if req.Limit <= 0 || req.Limit > 100 { 729 req.Limit = 50 730 } 731 732 return s.repo.List(ctx, req) 733} 734 735// SearchCommunities performs fuzzy search in AppView DB 736func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) { 737 if req.Query == "" { 738 return nil, 0, NewValidationError("query", "search query is required") 739 } 740 741 // Set defaults 742 if req.Limit <= 0 || req.Limit > 100 { 743 req.Limit = 50 744 } 745 746 return s.repo.Search(ctx, req) 747} 748 749// SubscribeToCommunity creates a subscription via write-forward to PDS 750// Uses OAuth session with DPoP authentication for secure PDS communication 751func (s *communityService) SubscribeToCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*Subscription, error) { 752 if session == nil { 753 return nil, NewValidationError("session", "required") 754 } 755 756 userDID := session.AccountDID.String() 757 758 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid 759 if contentVisibility <= 0 || contentVisibility > 5 { 760 contentVisibility = 3 761 } 762 763 // Resolve community identifier to DID 764 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 765 if err != nil { 766 return nil, fmt.Errorf("subscribe: %w", err) 767 } 768 769 // Verify community exists 770 community, err := s.repo.GetByDID(ctx, communityDID) 771 if err != nil { 772 return nil, err 773 } 774 775 // Check visibility - can't subscribe to private communities without invitation (TODO) 776 if community.Visibility == "private" { 777 return nil, ErrUnauthorized 778 } 779 780 // Create PDS client for this session (DPoP authentication) 781 pdsClient, err := s.getPDSClient(ctx, session) 782 if err != nil { 783 return nil, fmt.Errorf("failed to create PDS client: %w", err) 784 } 785 786 // Generate TID for record key 787 tid := syntax.NewTIDNow(0) 788 789 // Build subscription record 790 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure) 791 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid} 792 // Following atProto conventions, we use "subject" field to reference the community 793 subRecord := map[string]interface{}{ 794 "$type": "social.coves.community.subscription", 795 "subject": communityDID, // atProto convention: "subject" for entity references 796 "createdAt": time.Now().Format(time.RFC3339), 797 "contentVisibility": contentVisibility, 798 } 799 800 // Write-forward: create subscription record in user's repo using DPoP-authenticated client 801 recordURI, recordCID, err := pdsClient.CreateRecord(ctx, "social.coves.community.subscription", tid.String(), subRecord) 802 if err != nil { 803 if pds.IsAuthError(err) { 804 return nil, ErrUnauthorized 805 } 806 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 807 } 808 809 // Return subscription representation 810 subscription := &Subscription{ 811 UserDID: userDID, 812 CommunityDID: communityDID, 813 ContentVisibility: contentVisibility, 814 SubscribedAt: time.Now(), 815 RecordURI: recordURI, 816 RecordCID: recordCID, 817 } 818 819 return subscription, nil 820} 821 822// UnsubscribeFromCommunity removes a subscription via PDS delete 823// Uses OAuth session with DPoP authentication for secure PDS communication 824func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 825 if session == nil { 826 return NewValidationError("session", "required") 827 } 828 829 userDID := session.AccountDID.String() 830 831 // Resolve community identifier 832 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 833 if err != nil { 834 return fmt.Errorf("unsubscribe: %w", err) 835 } 836 837 // Get the subscription from AppView to find the record key 838 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID) 839 if err != nil { 840 return err 841 } 842 843 // Extract rkey from record URI (at://did/collection/rkey) 844 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI) 845 if rkey == "" { 846 return fmt.Errorf("invalid subscription record URI") 847 } 848 849 // Create PDS client for this session (DPoP authentication) 850 pdsClient, err := s.getPDSClient(ctx, session) 851 if err != nil { 852 return fmt.Errorf("failed to create PDS client: %w", err) 853 } 854 855 // Write-forward: delete record from PDS using DPoP-authenticated client 856 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe 857 if err := pdsClient.DeleteRecord(ctx, "social.coves.community.subscription", rkey); err != nil { 858 if pds.IsAuthError(err) { 859 return ErrUnauthorized 860 } 861 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 862 } 863 864 return nil 865} 866 867// GetUserSubscriptions queries AppView DB for user's subscriptions 868func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) { 869 if limit <= 0 || limit > 100 { 870 limit = 50 871 } 872 873 return s.repo.ListSubscriptions(ctx, userDID, limit, offset) 874} 875 876// GetCommunitySubscribers queries AppView DB for community subscribers 877func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) { 878 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 879 if err != nil { 880 return nil, err 881 } 882 883 if limit <= 0 || limit > 100 { 884 limit = 50 885 } 886 887 return s.repo.ListSubscribers(ctx, communityDID, limit, offset) 888} 889 890// GetMembership retrieves membership info from AppView DB 891func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) { 892 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 893 if err != nil { 894 return nil, err 895 } 896 897 return s.repo.GetMembership(ctx, userDID, communityDID) 898} 899 900// ListCommunityMembers queries AppView DB for members 901func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) { 902 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 903 if err != nil { 904 return nil, err 905 } 906 907 if limit <= 0 || limit > 100 { 908 limit = 50 909 } 910 911 return s.repo.ListMembers(ctx, communityDID, limit, offset) 912} 913 914// BlockCommunity blocks a community via write-forward to PDS 915// Uses OAuth session with DPoP authentication for secure PDS communication 916func (s *communityService) BlockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) (*CommunityBlock, error) { 917 if session == nil { 918 return nil, NewValidationError("session", "required") 919 } 920 921 userDID := session.AccountDID.String() 922 923 // Resolve community identifier (also verifies community exists) 924 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 925 if err != nil { 926 return nil, fmt.Errorf("block: %w", err) 927 } 928 929 // Create PDS client for this session (DPoP authentication) 930 pdsClient, err := s.getPDSClient(ctx, session) 931 if err != nil { 932 return nil, fmt.Errorf("block: failed to create PDS client: %w", err) 933 } 934 935 // Generate TID for record key 936 tid := syntax.NewTIDNow(0) 937 938 // Build block record 939 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE) 940 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid} 941 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern 942 blockRecord := map[string]interface{}{ 943 "$type": "social.coves.community.block", 944 "subject": communityDID, // DID of community being blocked 945 "createdAt": time.Now().Format(time.RFC3339), 946 } 947 948 // Write-forward: create block record in user's repo using DPoP-authenticated client 949 // Note: We don't check for existing blocks first because: 950 // 1. The PDS may reject duplicates (depending on implementation) 951 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING 952 // 3. This avoids a race condition where two concurrent requests both pass the check 953 recordURI, recordCID, err := pdsClient.CreateRecord(ctx, "social.coves.community.block", tid.String(), blockRecord) 954 if err != nil { 955 // Check for auth errors first 956 if pds.IsAuthError(err) { 957 return nil, ErrUnauthorized 958 } 959 960 // Check if this is a duplicate/conflict error from PDS 961 if pds.IsConflictError(err) { 962 // Fetch and return existing block from our indexed view 963 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID) 964 if getErr == nil { 965 // Block exists in our index - return it 966 return existingBlock, nil 967 } 968 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition) 969 // Any other error (DB outage, connection failure, etc.) should bubble up 970 if errors.Is(getErr, ErrBlockNotFound) { 971 // Race condition: PDS has the block but Jetstream hasn't indexed it yet 972 // Return typed conflict error so handler can return 409 instead of 500 973 // This is normal in eventually-consistent systems 974 return nil, ErrBlockAlreadyExists 975 } 976 // Real datastore error - bubble it up so operators see the failure 977 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr) 978 } 979 return nil, fmt.Errorf("failed to create block on PDS: %w", err) 980 } 981 982 // Return block representation 983 block := &CommunityBlock{ 984 UserDID: userDID, 985 CommunityDID: communityDID, 986 BlockedAt: time.Now(), 987 RecordURI: recordURI, 988 RecordCID: recordCID, 989 } 990 991 return block, nil 992} 993 994// UnblockCommunity removes a block via PDS delete 995// Uses OAuth session with DPoP authentication for secure PDS communication 996func (s *communityService) UnblockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 997 if session == nil { 998 return NewValidationError("session", "required") 999 } 1000 1001 userDID := session.AccountDID.String() 1002 1003 // Resolve community identifier 1004 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 1005 if err != nil { 1006 return fmt.Errorf("unblock: %w", err) 1007 } 1008 1009 // Get the block from AppView to find the record key 1010 block, err := s.repo.GetBlock(ctx, userDID, communityDID) 1011 if err != nil { 1012 return err 1013 } 1014 1015 // Extract rkey from record URI (at://did/collection/rkey) 1016 rkey := utils.ExtractRKeyFromURI(block.RecordURI) 1017 if rkey == "" { 1018 return fmt.Errorf("invalid block record URI") 1019 } 1020 1021 // Create PDS client for this session (DPoP authentication) 1022 pdsClient, err := s.getPDSClient(ctx, session) 1023 if err != nil { 1024 return fmt.Errorf("failed to create PDS client: %w", err) 1025 } 1026 1027 // Write-forward: delete record from PDS using DPoP-authenticated client 1028 if err := pdsClient.DeleteRecord(ctx, "social.coves.community.block", rkey); err != nil { 1029 if pds.IsAuthError(err) { 1030 return ErrUnauthorized 1031 } 1032 return fmt.Errorf("failed to delete block on PDS: %w", err) 1033 } 1034 1035 return nil 1036} 1037 1038// GetBlockedCommunities queries AppView DB for user's blocks 1039func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) { 1040 if limit <= 0 || limit > 100 { 1041 limit = 50 1042 } 1043 1044 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset) 1045} 1046 1047// IsBlocked checks if a user has blocked a community 1048func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) { 1049 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 1050 if err != nil { 1051 return false, err 1052 } 1053 1054 return s.repo.IsBlocked(ctx, userDID, communityDID) 1055} 1056 1057// ValidateHandle checks if a community handle is valid 1058func (s *communityService) ValidateHandle(handle string) error { 1059 if handle == "" { 1060 return NewValidationError("handle", "required") 1061 } 1062 1063 if !communityHandleRegex.MatchString(handle) { 1064 return ErrInvalidHandle 1065 } 1066 1067 return nil 1068} 1069 1070// ResolveCommunityIdentifier converts a community identifier to a DID 1071// Following Bluesky's pattern with Coves extensions: 1072// 1073// Accepts (like Bluesky's at-identifier): 1074// 1. DID: did:plc:abc123 (pass through) 1075// 2. Canonical handle: gardening.community.coves.social (atProto standard) 1076// 3. At-identifier: @gardening.community.coves.social (strip @ prefix) 1077// 1078// Coves-specific extensions: 1079// 4. Scoped format: !gardening@coves.social (parse and resolve) 1080// 1081// Returns: DID string 1082func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) { 1083 identifier = strings.TrimSpace(identifier) 1084 1085 if identifier == "" { 1086 return "", ErrInvalidInput 1087 } 1088 1089 // 1. DID - verify it exists and return (Bluesky standard) 1090 if strings.HasPrefix(identifier, "did:") { 1091 _, err := s.repo.GetByDID(ctx, identifier) 1092 if err != nil { 1093 if IsNotFound(err) { 1094 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err) 1095 } 1096 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err) 1097 } 1098 return identifier, nil 1099 } 1100 1101 // 2. Scoped format: !name@instance (Coves-specific) 1102 if strings.HasPrefix(identifier, "!") { 1103 return s.resolveScopedIdentifier(ctx, identifier) 1104 } 1105 1106 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix) 1107 identifier = strings.TrimPrefix(identifier, "@") 1108 1109 // 4. Canonical handle: name.community.instance.com (Bluesky standard) 1110 if strings.Contains(identifier, ".") { 1111 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier)) 1112 if err != nil { 1113 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err) 1114 } 1115 return community.DID, nil 1116 } 1117 1118 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)") 1119} 1120 1121// resolveScopedIdentifier handles Coves-specific !name@instance format 1122// Formats accepted: 1123// 1124// !gardening@coves.social -> c-gardening.coves.social 1125func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) { 1126 // Remove ! prefix 1127 scoped = strings.TrimPrefix(scoped, "!") 1128 1129 var name string 1130 var instanceDomain string 1131 1132 // Parse !name@instance 1133 if !strings.Contains(scoped, "@") { 1134 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)") 1135 } 1136 1137 parts := strings.SplitN(scoped, "@", 2) 1138 name = strings.TrimSpace(parts[0]) 1139 instanceDomain = strings.TrimSpace(parts[1]) 1140 1141 // Validate name format 1142 if name == "" { 1143 return "", NewValidationError("identifier", "community name cannot be empty") 1144 } 1145 1146 // Validate name is a valid DNS label (RFC 1035) 1147 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen 1148 if !isValidDNSLabel(name) { 1149 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)") 1150 } 1151 1152 // Validate instance domain format 1153 if !isValidDomain(instanceDomain) { 1154 return "", NewValidationError("identifier", "invalid instance domain format") 1155 } 1156 1157 // Normalize domain to lowercase (DNS is case-insensitive) 1158 // This fixes the bug where !gardening@Coves.social would fail lookup 1159 instanceDomain = strings.ToLower(instanceDomain) 1160 1161 // Validate the instance matches this server 1162 if !s.isLocalInstance(instanceDomain) { 1163 return "", NewValidationError("identifier", 1164 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain)) 1165 } 1166 1167 // Construct canonical handle: c-{name}.{instanceDomain} 1168 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup 1169 canonicalHandle := fmt.Sprintf("c-%s.%s", 1170 strings.ToLower(name), 1171 instanceDomain) // Already normalized to lowercase above 1172 1173 // Look up by canonical handle 1174 community, err := s.repo.GetByHandle(ctx, canonicalHandle) 1175 if err != nil { 1176 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err) 1177 } 1178 1179 return community.DID, nil 1180} 1181 1182// isLocalInstance checks if the provided domain matches this instance 1183func (s *communityService) isLocalInstance(domain string) bool { 1184 // Normalize both domains 1185 domain = strings.ToLower(strings.TrimSpace(domain)) 1186 instanceDomain := strings.ToLower(s.instanceDomain) 1187 1188 // Direct match 1189 return domain == instanceDomain 1190} 1191 1192// Validation helpers 1193 1194// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035 1195// - 1-63 characters 1196// - Alphanumeric and hyphens only 1197// - Cannot start or end with hyphen 1198func isValidDNSLabel(label string) bool { 1199 return dnsLabelRegex.MatchString(label) 1200} 1201 1202// isValidDomain validates that a string is a valid domain name 1203// Simplified validation - checks basic DNS hostname structure 1204func isValidDomain(domain string) bool { 1205 if domain == "" || len(domain) > 253 { 1206 return false 1207 } 1208 return domainRegex.MatchString(domain) 1209} 1210 1211func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error { 1212 if req.Name == "" { 1213 return NewValidationError("name", "required") 1214 } 1215 1216 // DNS label limit: 63 characters per label 1217 // Community handle format: {name}.community.{instanceDomain} 1218 // The first label is just req.Name, so it must be <= 63 chars 1219 if len(req.Name) > 63 { 1220 return NewValidationError("name", "must be 63 characters or less (DNS label limit)") 1221 } 1222 1223 // Name can only contain alphanumeric and hyphens 1224 // Must start and end with alphanumeric (not hyphen) 1225 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 1226 if !nameRegex.MatchString(req.Name) { 1227 return NewValidationError("name", "must contain only alphanumeric characters and hyphens") 1228 } 1229 1230 if req.Description != "" && len(req.Description) > 3000 { 1231 return NewValidationError("description", "must be 3000 characters or less") 1232 } 1233 1234 // Visibility should already be set with default in CreateCommunity 1235 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" { 1236 return ErrInvalidVisibility 1237 } 1238 1239 if req.CreatedByDID == "" { 1240 return NewValidationError("createdByDid", "required") 1241 } 1242 1243 // hostedByDID is auto-populated by the service layer, no validation needed 1244 // The handler ensures clients cannot provide this field 1245 1246 return nil 1247} 1248 1249// PDS write-forward helpers 1250 1251// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth) 1252func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 1253 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 1254 1255 payload := map[string]interface{}{ 1256 "repo": repoDID, 1257 "collection": collection, 1258 "record": record, 1259 } 1260 1261 if rkey != "" { 1262 payload["rkey"] = rkey 1263 } 1264 1265 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1266} 1267 1268// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth) 1269func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 1270 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/")) 1271 1272 payload := map[string]interface{}{ 1273 "repo": repoDID, 1274 "collection": collection, 1275 "rkey": rkey, 1276 "record": record, 1277 } 1278 1279 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1280} 1281 1282// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication) 1283func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 1284 jsonData, err := json.Marshal(payload) 1285 if err != nil { 1286 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 1287 } 1288 1289 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 1290 if err != nil { 1291 return "", "", fmt.Errorf("failed to create request: %w", err) 1292 } 1293 req.Header.Set("Content-Type", "application/json") 1294 1295 // Add authentication with provided access token 1296 if accessToken != "" { 1297 req.Header.Set("Authorization", "Bearer "+accessToken) 1298 } 1299 1300 // Dynamic timeout based on operation type 1301 // Write operations (createAccount, createRecord, putRecord) are slower due to: 1302 // - Keypair generation 1303 // - DID PLC registration 1304 // - Database writes on PDS 1305 timeout := 10 * time.Second // Default for read operations 1306 if strings.Contains(endpoint, "createAccount") || 1307 strings.Contains(endpoint, "createRecord") || 1308 strings.Contains(endpoint, "putRecord") { 1309 timeout = 30 * time.Second // Extended timeout for write operations 1310 } 1311 1312 client := &http.Client{Timeout: timeout} 1313 resp, err := client.Do(req) 1314 if err != nil { 1315 return "", "", fmt.Errorf("failed to call PDS: %w", err) 1316 } 1317 defer func() { 1318 if closeErr := resp.Body.Close(); closeErr != nil { 1319 log.Printf("Failed to close response body: %v", closeErr) 1320 } 1321 }() 1322 1323 body, err := io.ReadAll(resp.Body) 1324 if err != nil { 1325 return "", "", fmt.Errorf("failed to read response: %w", err) 1326 } 1327 1328 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 1329 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 1330 } 1331 1332 // Parse response to extract URI and CID 1333 var result struct { 1334 URI string `json:"uri"` 1335 CID string `json:"cid"` 1336 } 1337 if err := json.Unmarshal(body, &result); err != nil { 1338 // For delete operations, there might not be a response body 1339 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 1340 return "", "", nil 1341 } 1342 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 1343 } 1344 1345 return result.URI, result.CID, nil 1346} 1347 1348// Helper functions