Live video on the AT Protocol

model: use atproto objects

Assisted-by: Claude Code

makeworld 41b6565b ea0e64df

+253 -67
+63
js/docs/src/content/docs/lex-reference/moderation/place-stream-moderation-defs.md
··· 1 + --- 2 + title: place.stream.moderation.defs 3 + description: Reference for the place.stream.moderation.defs lexicon 4 + --- 5 + **Lexicon Version:** 1 6 + 7 + ## Definitions 8 + 9 + <a name="permissionview"></a> 10 + ### `permissionView` 11 + 12 + **Type:** `object` 13 + 14 + **Properties:** 15 + 16 + | Name | Type | Req'd | Description | Constraints | 17 + |------|------|----------|-------------|-------------| 18 + | `uri` | `string` | ✅ | AT-URI of the permission record | Format: `at-uri` | 19 + | `cid` | `string` | ✅ | Content identifier of the permission record | Format: `cid` | 20 + | `author` | [`app.bsky.actor.defs#profileViewBasic`](https://github.com/bluesky-social/atproto/tree/main/lexicons/app/bsky/actor/defs.json#profileViewBasic) | ✅ | The streamer who granted these permissions | | 21 + | `record` | `unknown` | ✅ | The permission record itself | | 22 + 23 + --- 24 + 25 + ## Lexicon Source 26 + ```json 27 + { 28 + "lexicon": 1, 29 + "id": "place.stream.moderation.defs", 30 + "defs": { 31 + "permissionView": { 32 + "type": "object", 33 + "required": [ 34 + "uri", 35 + "cid", 36 + "author", 37 + "record" 38 + ], 39 + "properties": { 40 + "uri": { 41 + "type": "string", 42 + "format": "at-uri", 43 + "description": "AT-URI of the permission record" 44 + }, 45 + "cid": { 46 + "type": "string", 47 + "format": "cid", 48 + "description": "Content identifier of the permission record" 49 + }, 50 + "author": { 51 + "type": "ref", 52 + "ref": "app.bsky.actor.defs#profileViewBasic", 53 + "description": "The streamer who granted these permissions" 54 + }, 55 + "record": { 56 + "type": "unknown", 57 + "description": "The permission record itself" 58 + } 59 + } 60 + } 61 + } 62 + } 63 + ```
+31
lexicons/place/stream/moderation/defs.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.moderation.defs", 4 + "defs": { 5 + "permissionView": { 6 + "type": "object", 7 + "required": ["uri", "cid", "author", "record"], 8 + "properties": { 9 + "uri": { 10 + "type": "string", 11 + "format": "at-uri", 12 + "description": "AT-URI of the permission record" 13 + }, 14 + "cid": { 15 + "type": "string", 16 + "format": "cid", 17 + "description": "Content identifier of the permission record" 18 + }, 19 + "author": { 20 + "type": "ref", 21 + "ref": "app.bsky.actor.defs#profileViewBasic", 22 + "description": "The streamer who granted these permissions" 23 + }, 24 + "record": { 25 + "type": "unknown", 26 + "description": "The permission record itself" 27 + } 28 + } 29 + } 30 + } 31 + }
+12 -33
pkg/atproto/sync.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "errors" 7 6 "fmt" 8 7 "reflect" ··· 431 430 } 432 431 log.Debug(ctx, "creating moderation delegation", "streamerDID", userDID, "moderatorDID", rec.Moderator) 433 432 434 - permissionsJSON, err := json.Marshal(rec.Permissions) 435 - if err != nil { 436 - return fmt.Errorf("failed to marshal permissions: %w", err) 437 - } 438 - 439 - // Parse optional expiration time 440 - var expirationTime *time.Time 441 - if rec.ExpirationTime != nil { 442 - t, err := time.Parse(time.RFC3339, *rec.ExpirationTime) 443 - if err != nil { 444 - log.Warn(ctx, "failed to parse expiration time", "value", *rec.ExpirationTime, "err", err) 445 - } else { 446 - expirationTime = &t 447 - } 448 - } 449 - 450 - delegation := &model.ModerationDelegation{ 451 - RKey: rkey.String(), 452 - CID: cid, 453 - RepoDID: userDID, 454 - Repo: repo, 455 - ModeratorDID: rec.Moderator, 456 - Permissions: permissionsJSON, 457 - ExpirationTime: expirationTime, 458 - Record: *recCBOR, 459 - CreatedAt: now, 460 - IndexedAt: now, 461 - } 462 - 463 - err = atsync.Model.CreateModerationDelegation(ctx, delegation) 433 + err = atsync.Model.CreateModerationDelegation(ctx, rec, aturi) 464 434 if err != nil { 465 435 return fmt.Errorf("failed to create moderation delegation: %w", err) 466 436 } 467 437 468 - // Publish moderation permission record to WebSocket bus for real-time updates 438 + view := &streamplace.ModerationDefs_PermissionView{ 439 + Uri: aturi.String(), 440 + Cid: cid, 441 + Author: &bsky.ActorDefs_ProfileViewBasic{ 442 + Did: userDID, 443 + Handle: repo.Handle, 444 + }, 445 + Record: &lexutil.LexiconTypeDecoder{Val: rec}, 446 + } 447 + // Publish moderation permission view to WebSocket bus for real-time updates 469 448 // This allows moderators to see their permissions instantly without page refresh 470 - go atsync.Bus.Publish(userDID, rec) 449 + go atsync.Bus.Publish(userDID, view) 471 450 472 451 default: 473 452 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec))
+5 -5
pkg/model/model.go
··· 108 108 GetMetadataConfiguration(ctx context.Context, repoDID string) (*MetadataConfiguration, error) 109 109 DeleteMetadataConfiguration(ctx context.Context, repoDID string) error 110 110 111 - CreateModerationDelegation(ctx context.Context, delegation *ModerationDelegation) error 111 + CreateModerationDelegation(ctx context.Context, rec *streamplace.ModerationPermission, aturi syntax.ATURI) error 112 112 DeleteModerationDelegation(ctx context.Context, rkey string) error 113 - GetModerationDelegation(ctx context.Context, streamerDID, moderatorDID string) (*ModerationDelegation, error) 114 - GetModerationDelegations(ctx context.Context, streamerDID, moderatorDID string) ([]*ModerationDelegation, error) 115 - GetModeratorDelegations(ctx context.Context, moderatorDID string) ([]*ModerationDelegation, error) 116 - GetStreamerModerators(ctx context.Context, streamerDID string) ([]*ModerationDelegation, error) 113 + GetModerationDelegation(ctx context.Context, streamerDID, moderatorDID string) (*streamplace.ModerationDefs_PermissionView, error) 114 + GetModerationDelegations(ctx context.Context, streamerDID, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 115 + GetModeratorDelegations(ctx context.Context, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 116 + GetStreamerModerators(ctx context.Context, streamerDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 117 117 118 118 CreateAuditLog(ctx context.Context, log *ModerationAuditLog) error 119 119 GetAuditLogs(ctx context.Context, streamerDID string, limit int, before *time.Time) ([]*ModerationAuditLog, error)
+104 -19
pkg/model/moderation_delegation.go
··· 1 1 package model 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "errors" 7 + "fmt" 6 8 "time" 7 9 10 + "github.com/bluesky-social/indigo/api/bsky" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + lexutil "github.com/bluesky-social/indigo/lex/util" 8 13 "gorm.io/gorm" 14 + "stream.place/streamplace/pkg/aqtime" 15 + "stream.place/streamplace/pkg/spid" 16 + "stream.place/streamplace/pkg/streamplace" 9 17 ) 10 18 11 19 type ModerationDelegation struct { 12 - RKey string `gorm:"primaryKey;column:rkey"` 13 - CID string `gorm:"column:cid"` 14 - RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_moderator,priority:1"` 15 - Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 16 - ModeratorDID string `gorm:"column:moderator_did;index:idx_repo_moderator,priority:2;index:idx_moderator"` 17 - Permissions []byte `gorm:"column:permissions"` // JSON array stored as bytes 18 - ExpirationTime *time.Time `gorm:"column:expiration_time"` // Optional expiration timestamp 19 - Record []byte `gorm:"column:record"` // Full CBOR record 20 - CreatedAt time.Time `gorm:"column:created_at"` 21 - IndexedAt time.Time `gorm:"column:indexed_at"` 20 + RKey string `gorm:"primaryKey;column:rkey"` 21 + CID string `gorm:"column:cid"` 22 + RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_moderator,priority:1"` 23 + Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 24 + ModeratorDID string `gorm:"column:moderator_did;index:idx_repo_moderator,priority:2;index:idx_moderator"` 25 + Record []byte `gorm:"column:record"` // Full CBOR record 26 + CreatedAt time.Time `gorm:"column:created_at"` 27 + IndexedAt time.Time `gorm:"column:indexed_at"` 22 28 } 23 29 24 - func (m *DBModel) CreateModerationDelegation(ctx context.Context, delegation *ModerationDelegation) error { 30 + func (md *ModerationDelegation) ToPermissionView() (*streamplace.ModerationDefs_PermissionView, error) { 31 + rec, err := lexutil.CborDecodeValue(md.Record) 32 + if err != nil { 33 + return nil, fmt.Errorf("error decoding moderation permission: %w", err) 34 + } 35 + 36 + uri := fmt.Sprintf("at://%s/place.stream.moderation.permission/%s", md.RepoDID, md.RKey) 37 + 38 + view := &streamplace.ModerationDefs_PermissionView{ 39 + Author: &bsky.ActorDefs_ProfileViewBasic{ 40 + Did: md.RepoDID, 41 + }, 42 + Cid: md.CID, 43 + Record: &lexutil.LexiconTypeDecoder{Val: rec}, 44 + Uri: uri, 45 + } 46 + 47 + if md.Repo != nil { 48 + view.Author.Handle = md.Repo.Handle 49 + } 50 + 51 + return view, nil 52 + } 53 + 54 + func (m *DBModel) CreateModerationDelegation(ctx context.Context, rec *streamplace.ModerationPermission, aturi syntax.ATURI) error { 55 + repoDID, err := aturi.Authority().AsDID() 56 + if err != nil { 57 + return fmt.Errorf("invalid ATURI authority: %w", err) 58 + } 59 + cid, err := spid.GetCID(rec) 60 + if err != nil { 61 + return fmt.Errorf("failed to get CID: %w", err) 62 + } 63 + rkey := aturi.RecordKey().String() 64 + 65 + buf := bytes.Buffer{} 66 + err = rec.MarshalCBOR(&buf) 67 + if err != nil { 68 + return fmt.Errorf("failed to marshal moderation permission: %w", err) 69 + } 70 + 71 + now := aqtime.FromTime(time.Now().UTC()) 72 + 73 + delegation := &ModerationDelegation{ 74 + RKey: rkey, 75 + CID: cid.String(), 76 + RepoDID: repoDID.String(), 77 + ModeratorDID: rec.Moderator, 78 + Record: buf.Bytes(), 79 + CreatedAt: now.Time().UTC(), 80 + IndexedAt: now.Time().UTC(), 81 + } 82 + 25 83 return m.DB.WithContext(ctx).Create(delegation).Error 26 84 } 27 85 ··· 29 87 return m.DB.WithContext(ctx).Where("rkey = ?", rkey).Delete(&ModerationDelegation{}).Error 30 88 } 31 89 32 - func (m *DBModel) GetModerationDelegation(ctx context.Context, streamerDID, moderatorDID string) (*ModerationDelegation, error) { 90 + func (m *DBModel) GetModerationDelegation(ctx context.Context, streamerDID, moderatorDID string) (*streamplace.ModerationDefs_PermissionView, error) { 33 91 var delegation ModerationDelegation 34 92 err := m.DB.WithContext(ctx).Preload("Repo"). 35 93 Where("repo_did = ? AND moderator_did = ?", streamerDID, moderatorDID). ··· 41 99 if err != nil { 42 100 return nil, err 43 101 } 44 - return &delegation, nil 102 + return delegation.ToPermissionView() 45 103 } 46 104 47 105 // GetModerationDelegations returns ALL delegation records for a moderator from a specific streamer. 48 106 // This allows multiple separate permission records (e.g., one for "ban", one for "hide") to be merged. 49 - func (m *DBModel) GetModerationDelegations(ctx context.Context, streamerDID, moderatorDID string) ([]*ModerationDelegation, error) { 107 + func (m *DBModel) GetModerationDelegations(ctx context.Context, streamerDID, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error) { 50 108 var delegations []*ModerationDelegation 51 109 err := m.DB.WithContext(ctx).Preload("Repo"). 52 110 Where("repo_did = ? AND moderator_did = ?", streamerDID, moderatorDID). ··· 54 112 if err != nil { 55 113 return nil, err 56 114 } 57 - return delegations, nil 115 + 116 + views := make([]*streamplace.ModerationDefs_PermissionView, len(delegations)) 117 + for i, d := range delegations { 118 + view, err := d.ToPermissionView() 119 + if err != nil { 120 + return nil, err 121 + } 122 + views[i] = view 123 + } 124 + return views, nil 58 125 } 59 126 60 - func (m *DBModel) GetModeratorDelegations(ctx context.Context, moderatorDID string) ([]*ModerationDelegation, error) { 127 + func (m *DBModel) GetModeratorDelegations(ctx context.Context, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error) { 61 128 var delegations []*ModerationDelegation 62 129 err := m.DB.WithContext(ctx).Preload("Repo"). 63 130 Where("moderator_did = ?", moderatorDID). ··· 65 132 if err != nil { 66 133 return nil, err 67 134 } 68 - return delegations, nil 135 + 136 + views := make([]*streamplace.ModerationDefs_PermissionView, len(delegations)) 137 + for i, d := range delegations { 138 + view, err := d.ToPermissionView() 139 + if err != nil { 140 + return nil, err 141 + } 142 + views[i] = view 143 + } 144 + return views, nil 69 145 } 70 146 71 - func (m *DBModel) GetStreamerModerators(ctx context.Context, streamerDID string) ([]*ModerationDelegation, error) { 147 + func (m *DBModel) GetStreamerModerators(ctx context.Context, streamerDID string) ([]*streamplace.ModerationDefs_PermissionView, error) { 72 148 var delegations []*ModerationDelegation 73 149 err := m.DB.WithContext(ctx).Preload("Repo"). 74 150 Where("repo_did = ?", streamerDID). ··· 76 152 if err != nil { 77 153 return nil, err 78 154 } 79 - return delegations, nil 155 + 156 + views := make([]*streamplace.ModerationDefs_PermissionView, len(delegations)) 157 + for i, d := range delegations { 158 + view, err := d.ToPermissionView() 159 + if err != nil { 160 + return nil, err 161 + } 162 + views[i] = view 163 + } 164 + return views, nil 80 165 }
+16 -10
pkg/moderation/permissions.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "fmt" 7 6 "time" 8 7 9 8 "stream.place/streamplace/pkg/model" 9 + "stream.place/streamplace/pkg/streamplace" 10 10 ) 11 11 12 12 // Permission scope constants ··· 81 81 } 82 82 83 83 // Check all delegation records and merge their permissions 84 - for _, delegation := range delegations { 85 - // Skip expired delegations 86 - if delegation.ExpirationTime != nil && time.Now().After(*delegation.ExpirationTime) { 87 - continue 84 + for _, delegationView := range delegations { 85 + // Extract the actual permission record from the view 86 + permRecord, ok := delegationView.Record.Val.(*streamplace.ModerationPermission) 87 + if !ok { 88 + return false, fmt.Errorf("failed to cast record to ModerationPermission") 88 89 } 89 90 90 - // Parse permissions JSON array 91 - var permissions []string 92 - if err := json.Unmarshal(delegation.Permissions, &permissions); err != nil { 93 - return false, fmt.Errorf("failed to unmarshal permissions: %w", err) 91 + // Skip expired delegations 92 + if permRecord.ExpirationTime != nil { 93 + expirationTime, err := time.Parse(time.RFC3339, *permRecord.ExpirationTime) 94 + if err != nil { 95 + return false, fmt.Errorf("failed to parse expiration time: %w", err) 96 + } 97 + if time.Now().After(expirationTime) { 98 + continue 99 + } 94 100 } 95 101 96 102 // Check if this delegation has the required permission 97 - for _, p := range permissions { 103 + for _, p := range permRecord.Permissions { 98 104 if p == permission { 99 105 return true, nil 100 106 }
+22
pkg/streamplace/moderationdefs.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + // Lexicon schema: place.stream.moderation.defs 4 + 5 + package streamplace 6 + 7 + import ( 8 + appbsky "github.com/bluesky-social/indigo/api/bsky" 9 + lexutil "github.com/bluesky-social/indigo/lex/util" 10 + ) 11 + 12 + // ModerationDefs_PermissionView is a "permissionView" in the place.stream.moderation.defs schema. 13 + type ModerationDefs_PermissionView struct { 14 + // author: The streamer who granted these permissions 15 + Author *appbsky.ActorDefs_ProfileViewBasic `json:"author" cborgen:"author"` 16 + // cid: Content identifier of the permission record 17 + Cid string `json:"cid" cborgen:"cid"` 18 + // record: The permission record itself 19 + Record *lexutil.LexiconTypeDecoder `json:"record" cborgen:"record"` 20 + // uri: AT-URI of the permission record 21 + Uri string `json:"uri" cborgen:"uri"` 22 + }