A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1// Package jetstream provides an ATProto Jetstream consumer for real-time updates.
2// It connects to the Bluesky Jetstream WebSocket, processes repository events,
3// indexes manifests and tags, and populates the AppView database for the web UI.
4package jetstream
5
6import (
7 "context"
8 "database/sql"
9 "encoding/json"
10 "fmt"
11 "log/slog"
12 "net/url"
13 "sync"
14 "time"
15
16 "atcr.io/pkg/appview/db"
17 "atcr.io/pkg/atproto"
18 "github.com/gorilla/websocket"
19 "github.com/klauspost/compress/zstd"
20)
21
22// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups
23type UserCache struct {
24 cache map[string]*db.User
25}
26
27// EventCallback is called for each processed event
28type EventCallback func(timeUS int64)
29
30// Worker consumes Jetstream events and populates the UI database
31type Worker struct {
32 db *sql.DB
33 jetstreamURL string
34 startCursor int64
35 wantedCollections []string
36 debugCollectionCount int
37 processor *Processor // Shared processor for DB operations
38 eventCallback EventCallback
39 connStartTime time.Time // Track when connection started for debugging
40
41 // Ping/pong tracking for connection health
42 pingsSent int64
43 pongsReceived int64
44 lastPongTime time.Time
45 pongMutex sync.Mutex
46
47 // In-memory cursor tracking for reconnects
48 lastCursor int64
49 cursorMutex sync.RWMutex
50}
51
52// NewWorker creates a new Jetstream worker
53// startCursor: Unix microseconds timestamp to start from (0 = start from now)
54func NewWorker(database *sql.DB, jetstreamURL string, startCursor int64) *Worker {
55 if jetstreamURL == "" {
56 jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe"
57 }
58
59 return &Worker{
60 db: database,
61 jetstreamURL: jetstreamURL,
62 startCursor: startCursor,
63 wantedCollections: []string{
64 "io.atcr.*", // Subscribe to all ATCR collections
65 },
66 processor: NewProcessor(database, true), // Use cache for live streaming
67 }
68}
69
70// Start begins consuming Jetstream events
71// This is a blocking function that runs until the context is cancelled
72func (w *Worker) Start(ctx context.Context) error {
73 // Build connection URL with filters
74 u, err := url.Parse(w.jetstreamURL)
75 if err != nil {
76 return fmt.Errorf("invalid jetstream URL: %w", err)
77 }
78
79 q := u.Query()
80 for _, collection := range w.wantedCollections {
81 q.Add("wantedCollections", collection)
82 }
83
84 // Add cursor if specified (for backfilling historical data or reconnects)
85 if w.startCursor > 0 {
86 q.Set("cursor", fmt.Sprintf("%d", w.startCursor))
87
88 // Calculate lag (cursor is in microseconds)
89 now := time.Now().UnixMicro()
90 lagSeconds := float64(now-w.startCursor) / 1_000_000.0
91 slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds)
92 }
93
94 // Disable compression for now to debug
95 // q.Set("compress", "true")
96 u.RawQuery = q.Encode()
97
98 slog.Info("Connecting to Jetstream", "url", u.String())
99
100 // Connect to Jetstream
101 conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
102 if err != nil {
103 return fmt.Errorf("failed to connect to jetstream: %w", err)
104 }
105 defer conn.Close()
106
107 // Track connection start time for debugging
108 w.connStartTime = time.Now()
109
110 // Reset ping/pong counters for this connection
111 w.pongMutex.Lock()
112 w.pingsSent = 0
113 w.pongsReceived = 0
114 w.lastPongTime = time.Now()
115 w.pongMutex.Unlock()
116
117 // Set up pong handler - called when server responds to our ping
118 conn.SetPongHandler(func(appData string) error {
119 w.pongMutex.Lock()
120 w.pongsReceived++
121 w.lastPongTime = time.Now()
122 w.pongMutex.Unlock()
123
124 // Reset read deadline - we know connection is alive
125 // Allow 90 seconds for next pong (3x ping interval)
126 conn.SetReadDeadline(time.Now().Add(90 * time.Second))
127 return nil
128 })
129
130 // Set initial read deadline
131 conn.SetReadDeadline(time.Now().Add(90 * time.Second))
132
133 // Create zstd decoder for decompressing messages
134 decoder, err := zstd.NewReader(nil)
135 if err != nil {
136 return fmt.Errorf("failed to create zstd decoder: %w", err)
137 }
138 defer decoder.Close()
139
140 slog.Info("Connected to Jetstream, listening for events...")
141
142 // Start heartbeat ticker to show Jetstream is alive
143 heartbeatTicker := time.NewTicker(30 * time.Second)
144 defer heartbeatTicker.Stop()
145
146 // Start ping ticker for keepalive
147 pingTicker := time.NewTicker(30 * time.Second)
148 defer pingTicker.Stop()
149
150 // Start ping sender goroutine
151 pingDone := make(chan struct{})
152 defer close(pingDone)
153
154 go func() {
155 for {
156 select {
157 case <-ctx.Done():
158 return
159 case <-pingDone:
160 return
161 case <-pingTicker.C:
162 // Check if we've received a pong recently
163 w.pongMutex.Lock()
164 timeSinceLastPong := time.Since(w.lastPongTime)
165 pingsTotal := w.pingsSent
166 pongsTotal := w.pongsReceived
167 w.pongMutex.Unlock()
168
169 // If no pong for 60 seconds, connection is likely dead
170 if timeSinceLastPong > 60*time.Second {
171 slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal)
172 conn.Close()
173 return
174 }
175
176 // Send ping with write deadline
177 conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
178 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
179 slog.Warn("Jetstream failed to send ping", "error", err)
180 conn.Close()
181 return
182 }
183
184 w.pongMutex.Lock()
185 w.pingsSent++
186 w.pongMutex.Unlock()
187 }
188 }
189 }()
190
191 eventCount := 0
192 lastHeartbeat := time.Now()
193
194 // Read messages
195 for {
196 select {
197 case <-ctx.Done():
198 return ctx.Err()
199 case <-heartbeatTicker.C:
200 elapsed := time.Since(lastHeartbeat)
201 slog.Info("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds())
202 eventCount = 0
203 lastHeartbeat = time.Now()
204 default:
205 _, message, err := conn.ReadMessage()
206 if err != nil {
207 // Calculate connection duration and idle time for debugging
208 connDuration := time.Since(w.connStartTime)
209 timeSinceLastEvent := time.Since(lastHeartbeat)
210
211 // Get ping/pong stats
212 w.pongMutex.Lock()
213 pingsTotal := w.pingsSent
214 pongsTotal := w.pongsReceived
215 timeSinceLastPong := time.Since(w.lastPongTime)
216 w.pongMutex.Unlock()
217
218 // Calculate ping/pong success rate
219 var pongRate float64
220 if pingsTotal > 0 {
221 pongRate = float64(pongsTotal) / float64(pingsTotal) * 100
222 }
223
224 // Determine diagnosis
225 var diagnosis string
226 if pongRate >= 95 && timeSinceLastPong < 60*time.Second {
227 diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption"
228 } else if timeSinceLastPong > 60*time.Second {
229 diagnosis = "Connection died (no pong for >60s), network issue detected"
230 } else if pongRate < 80 {
231 diagnosis = "Connection unstable (low pong rate), network quality issues"
232 } else {
233 diagnosis = "Connection closed unexpectedly"
234 }
235
236 // Log detailed context about the failure
237 slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis)
238
239 return fmt.Errorf("failed to read message: %w", err)
240 }
241
242 // For now, process uncompressed messages
243 // TODO: Re-enable compression once debugging is complete
244 _ = decoder // Keep decoder to avoid unused variable error
245
246 if err := w.processMessage(message); err != nil {
247 slog.Error("ERROR processing message", "error", err)
248 // Continue processing other messages
249 } else {
250 eventCount++
251 }
252 }
253 }
254}
255
256// SetEventCallback sets a callback to be called for each event
257func (w *Worker) SetEventCallback(cb EventCallback) {
258 w.eventCallback = cb
259}
260
261// GetLastCursor returns the last processed cursor (time_us) for reconnects
262func (w *Worker) GetLastCursor() int64 {
263 w.cursorMutex.RLock()
264 defer w.cursorMutex.RUnlock()
265 return w.lastCursor
266}
267
268// processMessage processes a single Jetstream event
269func (w *Worker) processMessage(message []byte) error {
270 var event JetstreamEvent
271 if err := json.Unmarshal(message, &event); err != nil {
272 return fmt.Errorf("failed to unmarshal event: %w", err)
273 }
274
275 // Update cursor for reconnects (do this first, even if processing fails)
276 w.cursorMutex.Lock()
277 w.lastCursor = event.TimeUS
278 w.cursorMutex.Unlock()
279
280 // Call callback if set
281 if w.eventCallback != nil {
282 w.eventCallback(event.TimeUS)
283 }
284
285 // Process based on event kind
286 switch event.Kind {
287 case "commit":
288 commit := event.Commit
289 if commit == nil {
290 return nil
291 }
292
293 // Set DID on commit from parent event
294 commit.DID = event.DID
295
296 // Debug: log first few collections we see to understand what's coming through
297 if w.debugCollectionCount < 5 {
298 slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID)
299 w.debugCollectionCount++
300 }
301
302 // Process based on collection
303 switch commit.Collection {
304 case atproto.ManifestCollection:
305 slog.Info("Jetstream processing manifest event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
306 return w.processManifest(commit)
307 case atproto.TagCollection:
308 slog.Info("Jetstream processing tag event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
309 return w.processTag(commit)
310 case atproto.StarCollection:
311 slog.Info("Jetstream processing star event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
312 return w.processStar(commit)
313 case atproto.RepoPageCollection:
314 slog.Info("Jetstream processing repo page event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
315 return w.processRepoPage(commit)
316 default:
317 // Ignore other collections
318 return nil
319 }
320
321 case "identity":
322 if event.Identity == nil {
323 return nil
324 }
325 return w.processIdentity(&event)
326
327 case "account":
328 if event.Account == nil {
329 return nil
330 }
331 return w.processAccount(&event)
332
333 default:
334 // Ignore unknown event kinds
335 return nil
336 }
337}
338
339// processManifest processes a manifest commit event
340func (w *Worker) processManifest(commit *CommitEvent) error {
341 // Resolve and upsert user with handle/PDS endpoint
342 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
343 return fmt.Errorf("failed to ensure user: %w", err)
344 }
345
346 if commit.Operation == "delete" {
347 // Delete manifest - rkey is just the digest, repository is not encoded
348 digest := commit.RKey
349 if err := db.DeleteManifest(w.db, commit.DID, "", digest); err != nil {
350 return err
351 }
352 // Clean up any orphaned tags pointing to this manifest
353 return db.CleanupOrphanedTags(w.db, commit.DID)
354 }
355
356 // Parse manifest record
357 if commit.Record == nil {
358 return nil // No record data, can't process
359 }
360
361 // Marshal map to bytes for processing
362 recordBytes, err := json.Marshal(commit.Record)
363 if err != nil {
364 return fmt.Errorf("failed to marshal record: %w", err)
365 }
366
367 // Use shared processor for DB operations
368 _, err = w.processor.ProcessManifest(context.Background(), commit.DID, recordBytes)
369 return err
370}
371
372// processTag processes a tag commit event
373func (w *Worker) processTag(commit *CommitEvent) error {
374 // Resolve and upsert user with handle/PDS endpoint
375 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
376 return fmt.Errorf("failed to ensure user: %w", err)
377 }
378
379 if commit.Operation == "delete" {
380 // Delete tag - decode rkey back to repository and tag
381 repo, tag := atproto.RKeyToRepositoryTag(commit.RKey)
382 slog.Info("Jetstream deleting tag", "did", commit.DID, "repository", repo, "tag", tag, "rkey", commit.RKey)
383 if err := db.DeleteTag(w.db, commit.DID, repo, tag); err != nil {
384 slog.Error("Jetstream ERROR deleting tag", "error", err)
385 return err
386 }
387 slog.Info("Jetstream successfully deleted tag", "did", commit.DID, "repository", repo, "tag", tag)
388 return nil
389 }
390
391 // Parse tag record
392 if commit.Record == nil {
393 return nil
394 }
395
396 // Marshal map to bytes for processing
397 recordBytes, err := json.Marshal(commit.Record)
398 if err != nil {
399 return fmt.Errorf("failed to marshal record: %w", err)
400 }
401
402 // Use shared processor for DB operations
403 return w.processor.ProcessTag(context.Background(), commit.DID, recordBytes)
404}
405
406// processStar processes a star commit event
407func (w *Worker) processStar(commit *CommitEvent) error {
408 // Resolve and upsert the user who starred (starrer)
409 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
410 return fmt.Errorf("failed to ensure user: %w", err)
411 }
412
413 if commit.Operation == "delete" {
414 // Unstar - parse the rkey to get the subject (owner DID and repository)
415 // Delete events don't include the full record, but the rkey contains the info we need
416 ownerDID, repository, err := atproto.ParseStarRecordKey(commit.RKey)
417 if err != nil {
418 return fmt.Errorf("failed to parse star rkey: %w", err)
419 }
420
421 // Delete the star record
422 return db.DeleteStar(w.db, commit.DID, ownerDID, repository)
423 }
424
425 // Parse star record
426 if commit.Record == nil {
427 return nil
428 }
429
430 // Marshal map to bytes for processing
431 recordBytes, err := json.Marshal(commit.Record)
432 if err != nil {
433 return fmt.Errorf("failed to marshal record: %w", err)
434 }
435
436 // Use shared processor for DB operations
437 return w.processor.ProcessStar(context.Background(), commit.DID, recordBytes)
438}
439
440// processRepoPage processes a repo page commit event
441func (w *Worker) processRepoPage(commit *CommitEvent) error {
442 // Resolve and upsert user with handle/PDS endpoint
443 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil {
444 return fmt.Errorf("failed to ensure user: %w", err)
445 }
446
447 isDelete := commit.Operation == "delete"
448
449 if isDelete {
450 // Delete - rkey is the repository name
451 slog.Info("Jetstream deleting repo page", "did", commit.DID, "repository", commit.RKey)
452 if err := w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, nil, true); err != nil {
453 slog.Error("Jetstream ERROR deleting repo page", "error", err)
454 return err
455 }
456 slog.Info("Jetstream successfully deleted repo page", "did", commit.DID, "repository", commit.RKey)
457 return nil
458 }
459
460 // Parse repo page record
461 if commit.Record == nil {
462 return nil
463 }
464
465 // Marshal map to bytes for processing
466 recordBytes, err := json.Marshal(commit.Record)
467 if err != nil {
468 return fmt.Errorf("failed to marshal record: %w", err)
469 }
470
471 // Use shared processor for DB operations
472 return w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, recordBytes, false)
473}
474
475// processIdentity processes an identity event (handle change)
476func (w *Worker) processIdentity(event *JetstreamEvent) error {
477 if event.Identity == nil {
478 return nil
479 }
480
481 identity := event.Identity
482 // Process via shared processor (only ATCR users will be logged at Info level)
483 return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle)
484}
485
486// processAccount processes an account event (status change)
487func (w *Worker) processAccount(event *JetstreamEvent) error {
488 if event.Account == nil {
489 return nil
490 }
491
492 account := event.Account
493 // Process via shared processor (only ATCR users will be logged at Info level)
494 return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status)
495}
496
497// JetstreamEvent represents a Jetstream event
498type JetstreamEvent struct {
499 DID string `json:"did"`
500 TimeUS int64 `json:"time_us"`
501 Kind string `json:"kind"` // "commit", "identity", "account"
502 Commit *CommitEvent `json:"commit,omitempty"`
503 Identity *IdentityInfo `json:"identity,omitempty"`
504 Account *AccountInfo `json:"account,omitempty"`
505}
506
507// CommitEvent represents a commit event (create/update/delete)
508type CommitEvent struct {
509 Rev string `json:"rev"`
510 Operation string `json:"operation"` // "create", "update", "delete"
511 Collection string `json:"collection"`
512 RKey string `json:"rkey"`
513 Record map[string]any `json:"record,omitempty"`
514 CID string `json:"cid,omitempty"`
515 DID string `json:"-"` // Set from parent event
516}
517
518// IdentityInfo represents an identity event
519type IdentityInfo struct {
520 DID string `json:"did"`
521 Handle string `json:"handle"`
522 Seq int64 `json:"seq"`
523 Time string `json:"time"`
524}
525
526// AccountInfo represents an account status event
527type AccountInfo struct {
528 Active bool `json:"active"`
529 DID string `json:"did"`
530 Seq int64 `json:"seq"`
531 Time string `json:"time"`
532 Status string `json:"status,omitempty"`
533}