···1+package pds
2+3+import (
4+ "context"
5+ "encoding/json"
6+ "log"
7+ "sync"
8+ "time"
9+10+ atproto "github.com/bluesky-social/indigo/api/atproto"
11+ lexutil "github.com/bluesky-social/indigo/lex/util"
12+ "github.com/gorilla/websocket"
13+)
14+15+// EventBroadcaster manages WebSocket connections and broadcasts repo events
16+type EventBroadcaster struct {
17+ mu sync.RWMutex
18+ subscribers map[*Subscriber]bool
19+ eventSeq int64
20+ eventHistory []HistoricalEvent // Ring buffer for cursor backfill
21+ maxHistory int
22+ holdDID string // DID of the hold for setting repo field
23+}
24+25+// Subscriber represents a WebSocket client subscribed to the firehose
26+type Subscriber struct {
27+ conn *websocket.Conn
28+ send chan *RepoCommitEvent
29+ cursor int64 // Last sequence number this subscriber has seen
30+}
31+32+// HistoricalEvent stores past events for cursor-based backfill
33+type HistoricalEvent struct {
34+ Seq int64
35+ Event *RepoCommitEvent
36+}
37+38+// RepoCommitEvent represents a #commit event in subscribeRepos
39+type RepoCommitEvent struct {
40+ Seq int64 `json:"seq" cborgen:"seq"`
41+ Repo string `json:"repo" cborgen:"repo"`
42+ Commit string `json:"commit" cborgen:"commit"` // CID string
43+ Rev string `json:"rev" cborgen:"rev"`
44+ Since *string `json:"since,omitempty" cborgen:"since,omitempty"`
45+ Blocks []byte `json:"blocks" cborgen:"blocks"` // CAR slice bytes
46+ Ops []*atproto.SyncSubscribeRepos_RepoOp `json:"ops" cborgen:"ops"`
47+ Time string `json:"time" cborgen:"time"`
48+ Type string `json:"$type" cborgen:"$type"` // Always "#commit"
49+}
50+51+// NewEventBroadcaster creates a new event broadcaster
52+func NewEventBroadcaster(holdDID string, maxHistory int) *EventBroadcaster {
53+ if maxHistory <= 0 {
54+ maxHistory = 100 // Default to keeping 100 events
55+ }
56+57+ return &EventBroadcaster{
58+ subscribers: make(map[*Subscriber]bool),
59+ eventSeq: 0,
60+ eventHistory: make([]HistoricalEvent, 0, maxHistory),
61+ maxHistory: maxHistory,
62+ holdDID: holdDID,
63+ }
64+}
65+66+// Subscribe adds a new WebSocket subscriber
67+func (b *EventBroadcaster) Subscribe(conn *websocket.Conn, cursor int64) *Subscriber {
68+ sub := &Subscriber{
69+ conn: conn,
70+ send: make(chan *RepoCommitEvent, 10), // Buffer 10 events
71+ cursor: cursor,
72+ }
73+74+ b.mu.Lock()
75+ b.subscribers[sub] = true
76+ currentSeq := b.eventSeq
77+ b.mu.Unlock()
78+79+ // Send historical events if cursor is provided and < current seq
80+ if cursor > 0 && cursor < currentSeq {
81+ go b.backfillSubscriber(sub, cursor)
82+ }
83+84+ // Start goroutine to handle sending events to this subscriber
85+ go b.handleSubscriber(sub)
86+87+ return sub
88+}
89+90+// Unsubscribe removes a WebSocket subscriber
91+func (b *EventBroadcaster) Unsubscribe(sub *Subscriber) {
92+ b.mu.Lock()
93+ defer b.mu.Unlock()
94+95+ if _, ok := b.subscribers[sub]; ok {
96+ delete(b.subscribers, sub)
97+ close(sub.send)
98+ }
99+}
100+101+// Broadcast sends an event to all subscribers
102+func (b *EventBroadcaster) Broadcast(ctx context.Context, event *RepoEvent) {
103+ b.mu.Lock()
104+ defer b.mu.Unlock()
105+106+ // Increment sequence
107+ b.eventSeq++
108+ seq := b.eventSeq
109+110+ // Convert RepoEvent to RepoCommitEvent
111+ commitEvent := b.convertToCommitEvent(event, seq)
112+113+ // Store in history for backfill
114+ b.addToHistory(seq, commitEvent)
115+116+ // Broadcast to all subscribers
117+ for sub := range b.subscribers {
118+ select {
119+ case sub.send <- commitEvent:
120+ // Sent successfully
121+ default:
122+ // Subscriber's buffer is full, skip (they'll get disconnected for being too slow)
123+ log.Printf("Warning: subscriber buffer full, skipping event seq=%d", seq)
124+ }
125+ }
126+}
127+128+// convertToCommitEvent converts a RepoEvent to a RepoCommitEvent
129+func (b *EventBroadcaster) convertToCommitEvent(event *RepoEvent, seq int64) *RepoCommitEvent {
130+ // Convert RepoOps to atproto.SyncSubscribeRepos_RepoOp
131+ ops := make([]*atproto.SyncSubscribeRepos_RepoOp, len(event.Ops))
132+ for i, op := range event.Ops {
133+ action := string(op.Kind) // "create", "update", "delete"
134+ path := op.Collection + "/" + op.Rkey
135+136+ // Convert CID to LexLink if present
137+ var cidLink *lexutil.LexLink
138+ if op.RecCid != nil {
139+ link := lexutil.LexLink(*op.RecCid)
140+ cidLink = &link
141+ }
142+143+ ops[i] = &atproto.SyncSubscribeRepos_RepoOp{
144+ Action: action,
145+ Path: path,
146+ Cid: cidLink,
147+ }
148+ }
149+150+ // Event.NewRoot is a cid.Cid, convert to string
151+ commitCID := event.NewRoot.String()
152+153+ return &RepoCommitEvent{
154+ Seq: seq,
155+ Repo: b.holdDID, // Set to hold's DID
156+ Commit: commitCID,
157+ Rev: event.Rev,
158+ Since: event.Since,
159+ Blocks: event.RepoSlice, // CAR slice bytes
160+ Ops: ops,
161+ Time: time.Now().Format(time.RFC3339),
162+ Type: "#commit",
163+ }
164+}
165+166+// addToHistory adds an event to the history ring buffer
167+func (b *EventBroadcaster) addToHistory(seq int64, event *RepoCommitEvent) {
168+ he := HistoricalEvent{
169+ Seq: seq,
170+ Event: event,
171+ }
172+173+ // Simple ring buffer: keep last N events
174+ if len(b.eventHistory) >= b.maxHistory {
175+ // Remove oldest event
176+ b.eventHistory = b.eventHistory[1:]
177+ }
178+ b.eventHistory = append(b.eventHistory, he)
179+}
180+181+// backfillSubscriber sends historical events to a subscriber
182+func (b *EventBroadcaster) backfillSubscriber(sub *Subscriber, cursor int64) {
183+ b.mu.RLock()
184+ defer b.mu.RUnlock()
185+186+ for _, he := range b.eventHistory {
187+ if he.Seq > cursor {
188+ select {
189+ case sub.send <- he.Event:
190+ // Sent
191+ case <-time.After(5 * time.Second):
192+ // Timeout, subscriber too slow
193+ log.Printf("Backfill timeout for subscriber at seq=%d", he.Seq)
194+ return
195+ }
196+ }
197+ }
198+}
199+200+// handleSubscriber handles sending events to a subscriber over WebSocket
201+func (b *EventBroadcaster) handleSubscriber(sub *Subscriber) {
202+ defer func() {
203+ b.Unsubscribe(sub)
204+ sub.conn.Close()
205+ }()
206+207+ for event := range sub.send {
208+ // Encode as CBOR
209+ cborBytes, err := encodeCBOR(event)
210+ if err != nil {
211+ log.Printf("Failed to encode event as CBOR: %v", err)
212+ continue
213+ }
214+215+ // Write CBOR message to WebSocket
216+ err = sub.conn.WriteMessage(websocket.BinaryMessage, cborBytes)
217+ if err != nil {
218+ log.Printf("Failed to write to websocket: %v", err)
219+ return
220+ }
221+222+ // Update cursor
223+ sub.cursor = event.Seq
224+ }
225+}
226+227+// encodeCBOR encodes an event as CBOR
228+func encodeCBOR(event *RepoCommitEvent) ([]byte, error) {
229+ // For now, use JSON encoding wrapped in CBOR envelope
230+ // In production, you'd use proper CBOR encoding
231+ // The atproto spec requires DAG-CBOR with specific header
232+233+ // Simple approach: encode as JSON for MVP
234+ // Real implementation needs proper CBOR-gen types
235+ return json.Marshal(event)
236+}
237+238+// SetRepoEventHandler creates a callback to be registered with RepoManager
239+func (b *EventBroadcaster) SetRepoEventHandler() func(context.Context, *RepoEvent) {
240+ return func(ctx context.Context, event *RepoEvent) {
241+ // Broadcast the event to all subscribers
242+ // The holdDID is already set in the broadcaster
243+ b.Broadcast(ctx, event)
244+ }
245+}
246+247+// GetCurrentSeq returns the current event sequence number
248+func (b *EventBroadcaster) GetCurrentSeq() int64 {
249+ b.mu.RLock()
250+ defer b.mu.RUnlock()
251+ return b.eventSeq
252+}
+5
pkg/hold/pds/server.go
···102 return p.signingKey
103}
10400000105// Bootstrap initializes the hold with the captain record and owner as first crew member
106func (p *HoldPDS) Bootstrap(ctx context.Context, ownerDID string, public bool, allowAllCrew bool) error {
107 if ownerDID == "" {
···102 return p.signingKey
103}
104105+// RepomgrRef returns a reference to the RepoManager for event handler setup
106+func (p *HoldPDS) RepomgrRef() *RepoManager {
107+ return p.repomgr
108+}
109+110// Bootstrap initializes the hold with the captain record and owner as first crew member
111func (p *HoldPDS) Bootstrap(ctx context.Context, ownerDID string, public bool, allowAllCrew bool) error {
112 if ownerDID == "" {