A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1// Package jetstream provides an ATProto Jetstream consumer for real-time updates.
2// It connects to the Bluesky Jetstream WebSocket, processes repository events,
3// indexes manifests and tags, and populates the AppView database for the web UI.
4package jetstream
5
6import (
7 "context"
8 "database/sql"
9 "encoding/json"
10 "fmt"
11 "log/slog"
12 "net/url"
13 "sync"
14 "time"
15
16 "atcr.io/pkg/appview/db"
17 "atcr.io/pkg/atproto"
18 "github.com/gorilla/websocket"
19 "github.com/klauspost/compress/zstd"
20)
21
22// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups
23type UserCache struct {
24 cache map[string]*db.User
25}
26
27// EventCallback is called for each processed event
28type EventCallback func(timeUS int64)
29
30// Worker consumes Jetstream events and populates the UI database
31type Worker struct {
32 db *sql.DB
33 jetstreamURL string
34 endpoints *EndpointRotator
35 startCursor int64
36 wantedCollections []string
37 debugCollectionCount int
38 processor *Processor // Shared processor for DB operations
39 statsCache *StatsCache // In-memory cache for stats aggregation across holds
40 eventCallback EventCallback
41 connStartTime time.Time // Track when connection started for debugging
42
43 // Ping/pong tracking for connection health
44 pingsSent int64
45 pongsReceived int64
46 lastPongTime time.Time
47 pongMutex sync.Mutex
48
49 // In-memory cursor tracking for reconnects
50 lastCursor int64
51 cursorMutex sync.RWMutex
52}
53
54// NewWorker creates a new Jetstream worker
55// startCursor: Unix microseconds timestamp to start from (0 = start from now)
56func NewWorker(database *sql.DB, urls []string, startCursor int64) *Worker {
57 if len(urls) == 0 {
58 urls = []string{"wss://jetstream2.us-west.bsky.network/subscribe"}
59 }
60
61 rotator := NewEndpointRotator(urls)
62
63 // Create shared stats cache for aggregating across holds
64 statsCache := NewStatsCache()
65
66 return &Worker{
67 db: database,
68 jetstreamURL: rotator.Current(),
69 endpoints: rotator,
70 startCursor: startCursor,
71 wantedCollections: []string{
72 "io.atcr.*", // Subscribe to all ATCR collections
73 "app.bsky.actor.profile", // Subscribe to Bluesky profile updates for avatar sync
74 },
75 statsCache: statsCache,
76 processor: NewProcessor(database, true, statsCache), // Use cache for live streaming
77 }
78}
79
80// Start begins consuming Jetstream events
81// This is a blocking function that runs until the context is cancelled
82func (w *Worker) Start(ctx context.Context) error {
83 // Build connection URL with filters
84 u, err := url.Parse(w.jetstreamURL)
85 if err != nil {
86 return fmt.Errorf("invalid jetstream URL: %w", err)
87 }
88
89 q := u.Query()
90 for _, collection := range w.wantedCollections {
91 q.Add("wantedCollections", collection)
92 }
93
94 // Add cursor if specified (for backfilling historical data or reconnects)
95 if w.startCursor > 0 {
96 q.Set("cursor", fmt.Sprintf("%d", w.startCursor))
97
98 // Calculate lag (cursor is in microseconds)
99 now := time.Now().UnixMicro()
100 lagSeconds := float64(now-w.startCursor) / 1_000_000.0
101 slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds)
102 }
103
104 // Disable compression for now to debug
105 // q.Set("compress", "true")
106 u.RawQuery = q.Encode()
107
108 slog.Info("Connecting to Jetstream", "url", u.String())
109
110 // Connect to Jetstream
111 conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
112 if err != nil {
113 return fmt.Errorf("failed to connect to jetstream: %w", err)
114 }
115 defer conn.Close()
116
117 // Track connection start time for debugging
118 w.connStartTime = time.Now()
119
120 // Reset ping/pong counters for this connection
121 w.pongMutex.Lock()
122 w.pingsSent = 0
123 w.pongsReceived = 0
124 w.lastPongTime = time.Now()
125 w.pongMutex.Unlock()
126
127 // Set up pong handler - called when server responds to our ping
128 conn.SetPongHandler(func(appData string) error {
129 w.pongMutex.Lock()
130 w.pongsReceived++
131 w.lastPongTime = time.Now()
132 w.pongMutex.Unlock()
133
134 // Reset read deadline - we know connection is alive
135 // Allow 90 seconds for next pong (3x ping interval)
136 return conn.SetReadDeadline(time.Now().Add(90 * time.Second))
137 })
138
139 // Set initial read deadline
140 if err := conn.SetReadDeadline(time.Now().Add(90 * time.Second)); err != nil {
141 return fmt.Errorf("failed to set read deadline: %w", err)
142 }
143
144 // Create zstd decoder for decompressing messages
145 decoder, err := zstd.NewReader(nil)
146 if err != nil {
147 return fmt.Errorf("failed to create zstd decoder: %w", err)
148 }
149 defer decoder.Close()
150
151 slog.Info("Connected to Jetstream, listening for events...")
152
153 // Start heartbeat ticker to show Jetstream is alive
154 heartbeatTicker := time.NewTicker(30 * time.Second)
155 defer heartbeatTicker.Stop()
156
157 // Start ping ticker for keepalive
158 pingTicker := time.NewTicker(30 * time.Second)
159 defer pingTicker.Stop()
160
161 // Start ping sender goroutine
162 pingDone := make(chan struct{})
163 defer close(pingDone)
164
165 go func() {
166 for {
167 select {
168 case <-ctx.Done():
169 return
170 case <-pingDone:
171 return
172 case <-pingTicker.C:
173 // Check if we've received a pong recently
174 w.pongMutex.Lock()
175 timeSinceLastPong := time.Since(w.lastPongTime)
176 pingsTotal := w.pingsSent
177 pongsTotal := w.pongsReceived
178 w.pongMutex.Unlock()
179
180 // If no pong for 60 seconds, connection is likely dead
181 if timeSinceLastPong > 60*time.Second {
182 slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal)
183 conn.Close()
184 return
185 }
186
187 // Send ping with write deadline
188 if err := conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
189 slog.Warn("Jetstream failed to set write deadline", "error", err)
190 conn.Close()
191 return
192 }
193 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
194 slog.Warn("Jetstream failed to send ping", "error", err)
195 conn.Close()
196 return
197 }
198
199 w.pongMutex.Lock()
200 w.pingsSent++
201 w.pongMutex.Unlock()
202 }
203 }
204 }()
205
206 eventCount := 0
207 lastHeartbeat := time.Now()
208
209 // Read messages
210 for {
211 select {
212 case <-ctx.Done():
213 return ctx.Err()
214 case <-heartbeatTicker.C:
215 elapsed := time.Since(lastHeartbeat)
216 slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds())
217 eventCount = 0
218 lastHeartbeat = time.Now()
219 default:
220 _, message, err := conn.ReadMessage()
221 if err != nil {
222 // Calculate connection duration and idle time for debugging
223 connDuration := time.Since(w.connStartTime)
224 timeSinceLastEvent := time.Since(lastHeartbeat)
225
226 // Get ping/pong stats
227 w.pongMutex.Lock()
228 pingsTotal := w.pingsSent
229 pongsTotal := w.pongsReceived
230 timeSinceLastPong := time.Since(w.lastPongTime)
231 w.pongMutex.Unlock()
232
233 // Calculate ping/pong success rate
234 var pongRate float64
235 if pingsTotal > 0 {
236 pongRate = float64(pongsTotal) / float64(pingsTotal) * 100
237 }
238
239 // Determine diagnosis
240 var diagnosis string
241 if pongRate >= 95 && timeSinceLastPong < 60*time.Second {
242 diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption"
243 } else if timeSinceLastPong > 60*time.Second {
244 diagnosis = "Connection died (no pong for >60s), network issue detected"
245 } else if pongRate < 80 {
246 diagnosis = "Connection unstable (low pong rate), network quality issues"
247 } else {
248 diagnosis = "Connection closed unexpectedly"
249 }
250
251 // Log detailed context about the failure
252 slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis)
253
254 return fmt.Errorf("failed to read message: %w", err)
255 }
256
257 // For now, process uncompressed messages
258 // TODO: Re-enable compression once debugging is complete
259 _ = decoder // Keep decoder to avoid unused variable error
260
261 if err := w.processMessage(message); err != nil {
262 slog.Error("ERROR processing message", "error", err)
263 // Continue processing other messages
264 } else {
265 eventCount++
266 }
267 }
268 }
269}
270
271// StartWithFailover runs the Jetstream worker with automatic failover across endpoints.
272// On disconnect it retries the same endpoint with escalating delays (1s, 5s, 10s).
273// If all retries fail, it fails over to the next endpoint and rewinds the cursor
274// 30 seconds to avoid missing events (events are idempotent DB upserts).
275// Cycles through all endpoints indefinitely and never gives up.
276func (w *Worker) StartWithFailover(ctx context.Context) {
277 retryDelays := []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second}
278
279 for {
280 currentURL := w.endpoints.Current()
281 w.jetstreamURL = currentURL
282
283 slog.Info("Jetstream connecting", "url", currentURL)
284 err := w.Start(ctx)
285 if ctx.Err() != nil {
286 return // Context cancelled, clean shutdown
287 }
288
289 // Capture cursor at disconnect time for rewind calculation
290 disconnectCursor := w.GetLastCursor()
291
292 // Retry same endpoint with escalating delays
293 recovered := false
294 for i, delay := range retryDelays {
295 slog.Warn("Jetstream disconnected, retrying same endpoint",
296 "url", currentURL,
297 "attempt", i+1,
298 "delay", delay,
299 "error", err)
300 time.Sleep(delay)
301
302 if ctx.Err() != nil {
303 return
304 }
305
306 w.jetstreamURL = currentURL
307 err = w.Start(ctx)
308 if ctx.Err() != nil {
309 return
310 }
311 if err == nil {
312 recovered = true
313 break
314 }
315 // Update disconnect cursor if we got further
316 if latest := w.GetLastCursor(); latest > disconnectCursor {
317 disconnectCursor = latest
318 }
319 }
320
321 if recovered {
322 continue
323 }
324
325 // All retries failed — failover to next endpoint
326 failedURL := currentURL
327 nextURL := w.endpoints.Next()
328
329 // Rewind cursor 30 seconds (30M microseconds) to avoid gaps
330 if disconnectCursor > 0 {
331 rewound := disconnectCursor - 30_000_000
332 if rewound < 0 {
333 rewound = 0
334 }
335 w.cursorMutex.Lock()
336 w.lastCursor = rewound
337 w.startCursor = rewound
338 w.cursorMutex.Unlock()
339 slog.Warn("Jetstream failing over to next endpoint",
340 "failed_url", failedURL,
341 "next_url", nextURL,
342 "cursor_rewound_by", "30s")
343 } else {
344 slog.Warn("Jetstream failing over to next endpoint",
345 "failed_url", failedURL,
346 "next_url", nextURL)
347 }
348 }
349}
350
351// SetEventCallback sets a callback to be called for each event
352func (w *Worker) SetEventCallback(cb EventCallback) {
353 w.eventCallback = cb
354}
355
356// Processor returns the worker's processor for configuration (e.g., setting webhook dispatcher)
357func (w *Worker) Processor() *Processor {
358 return w.processor
359}
360
361// GetLastCursor returns the last processed cursor (time_us) for reconnects
362func (w *Worker) GetLastCursor() int64 {
363 w.cursorMutex.RLock()
364 defer w.cursorMutex.RUnlock()
365 return w.lastCursor
366}
367
368// processMessage processes a single Jetstream event
369func (w *Worker) processMessage(message []byte) error {
370 var event JetstreamEvent
371 if err := json.Unmarshal(message, &event); err != nil {
372 return fmt.Errorf("failed to unmarshal event: %w", err)
373 }
374
375 // Update cursor for reconnects (do this first, even if processing fails)
376 w.cursorMutex.Lock()
377 w.lastCursor = event.TimeUS
378 w.cursorMutex.Unlock()
379
380 // Call callback if set
381 if w.eventCallback != nil {
382 w.eventCallback(event.TimeUS)
383 }
384
385 // Process based on event kind
386 switch event.Kind {
387 case "commit":
388 commit := event.Commit
389 if commit == nil {
390 return nil
391 }
392
393 // Set DID on commit from parent event
394 commit.DID = event.DID
395
396 // Debug: log first few collections we see to understand what's coming through
397 if w.debugCollectionCount < 5 {
398 slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID)
399 w.debugCollectionCount++
400 }
401
402 // Check if this is a collection we care about
403 if !isRelevantCollection(commit.Collection) {
404 return nil // Ignore irrelevant collections
405 }
406
407 // Marshal record to bytes for processing
408 var recordBytes []byte
409 if commit.Record != nil {
410 var err error
411 recordBytes, err = json.Marshal(commit.Record)
412 if err != nil {
413 return fmt.Errorf("failed to marshal record: %w", err)
414 }
415 }
416
417 // Handle Bluesky profile updates separately (for avatar sync)
418 if commit.Collection == BlueskyProfileCollection {
419 // Only process creates/updates, not deletes (we don't clear avatars on profile delete)
420 if commit.Operation == "delete" {
421 return nil
422 }
423 return w.processor.ProcessProfileUpdate(context.Background(), commit.DID, recordBytes)
424 }
425
426 // Log ATCR events (but not Bluesky profile events - too noisy)
427 slog.Info("Jetstream processing event",
428 "collection", commit.Collection,
429 "did", commit.DID,
430 "operation", commit.Operation,
431 "rkey", commit.RKey)
432
433 isDelete := commit.Operation == "delete"
434 return w.processor.ProcessRecord(context.Background(), commit.DID, commit.Collection, commit.RKey, recordBytes, isDelete, nil)
435
436 case "identity":
437 if event.Identity == nil {
438 return nil
439 }
440 return w.processIdentity(&event)
441
442 case "account":
443 if event.Account == nil {
444 return nil
445 }
446 return w.processAccount(&event)
447
448 default:
449 // Ignore unknown event kinds
450 return nil
451 }
452}
453
454// processIdentity processes an identity event (handle change)
455func (w *Worker) processIdentity(event *JetstreamEvent) error {
456 if event.Identity == nil {
457 return nil
458 }
459
460 identity := event.Identity
461 // Process via shared processor (only ATCR users will be logged at Info level)
462 return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle)
463}
464
465// processAccount processes an account event (status change)
466func (w *Worker) processAccount(event *JetstreamEvent) error {
467 if event.Account == nil {
468 return nil
469 }
470
471 account := event.Account
472 // Process via shared processor (only ATCR users will be logged at Info level)
473 return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status)
474}
475
476// BlueskyProfileCollection is the collection for Bluesky actor profiles
477const BlueskyProfileCollection = "app.bsky.actor.profile"
478
479// isRelevantCollection returns true if the collection is one we process
480func isRelevantCollection(collection string) bool {
481 switch collection {
482 case atproto.ManifestCollection,
483 atproto.TagCollection,
484 atproto.StarCollection,
485 atproto.RepoPageCollection,
486 atproto.SailorProfileCollection,
487 atproto.StatsCollection,
488 atproto.CaptainCollection,
489 atproto.CrewCollection,
490 atproto.ScanCollection,
491 BlueskyProfileCollection: // For avatar sync
492 return true
493 default:
494 return false
495 }
496}
497
498// JetstreamEvent represents a Jetstream event
499type JetstreamEvent struct {
500 DID string `json:"did"`
501 TimeUS int64 `json:"time_us"`
502 Kind string `json:"kind"` // "commit", "identity", "account"
503 Commit *CommitEvent `json:"commit,omitempty"`
504 Identity *IdentityInfo `json:"identity,omitempty"`
505 Account *AccountInfo `json:"account,omitempty"`
506}
507
508// CommitEvent represents a commit event (create/update/delete)
509type CommitEvent struct {
510 Rev string `json:"rev"`
511 Operation string `json:"operation"` // "create", "update", "delete"
512 Collection string `json:"collection"`
513 RKey string `json:"rkey"`
514 Record map[string]any `json:"record,omitempty"`
515 CID string `json:"cid,omitempty"`
516 DID string `json:"-"` // Set from parent event
517}
518
519// IdentityInfo represents an identity event
520type IdentityInfo struct {
521 DID string `json:"did"`
522 Handle string `json:"handle"`
523 Seq int64 `json:"seq"`
524 Time string `json:"time"`
525}
526
527// AccountInfo represents an account status event
528type AccountInfo struct {
529 Active bool `json:"active"`
530 DID string `json:"did"`
531 Seq int64 `json:"seq"`
532 Time string `json:"time"`
533 Status string `json:"status,omitempty"`
534}