1package server
2
3import (
4 "context"
5 "time"
6
7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/lex/util"
9 "github.com/btcsuite/websocket"
10 "github.com/haileyok/cocoon/metrics"
11 "github.com/labstack/echo/v4"
12)
13
14func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
15 ctx := e.Request().Context()
16 logger := s.logger.With("component", "subscribe-repos-websocket")
17
18 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
19 if err != nil {
20 logger.Error("unable to establish websocket with relay", "err", err)
21 return err
22 }
23
24 ident := e.RealIP() + "-" + e.Request().UserAgent()
25 logger = logger.With("ident", ident)
26 logger.Info("new connection established")
27
28 metrics.RelaysConnected.WithLabelValues(ident).Inc()
29 defer func() {
30 metrics.RelaysConnected.WithLabelValues(ident).Dec()
31 }()
32
33 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
34 return true
35 }, nil)
36 if err != nil {
37 return err
38 }
39 defer cancel()
40
41 header := events.EventHeader{Op: events.EvtKindMessage}
42 for evt := range evts {
43 func() {
44 defer func() {
45 metrics.RelaySends.WithLabelValues(ident, header.MsgType).Inc()
46 }()
47
48 wc, err := conn.NextWriter(websocket.BinaryMessage)
49 if err != nil {
50 logger.Error("error writing message to relay", "err", err)
51 return
52 }
53
54 if ctx.Err() != nil {
55 logger.Error("context error", "err", err)
56 return
57 }
58
59 var obj util.CBOR
60 switch {
61 case evt.Error != nil:
62 header.Op = events.EvtKindErrorFrame
63 obj = evt.Error
64 case evt.RepoCommit != nil:
65 header.MsgType = "#commit"
66 obj = evt.RepoCommit
67 case evt.RepoIdentity != nil:
68 header.MsgType = "#identity"
69 obj = evt.RepoIdentity
70 case evt.RepoAccount != nil:
71 header.MsgType = "#account"
72 obj = evt.RepoAccount
73 case evt.RepoInfo != nil:
74 header.MsgType = "#info"
75 obj = evt.RepoInfo
76 default:
77 logger.Warn("unrecognized event kind")
78 return
79 }
80
81 if err := header.MarshalCBOR(wc); err != nil {
82 logger.Error("failed to write header to relay", "err", err)
83 return
84 }
85
86 if err := obj.MarshalCBOR(wc); err != nil {
87 logger.Error("failed to write event to relay", "err", err)
88 return
89 }
90
91 if err := wc.Close(); err != nil {
92 logger.Error("failed to flush-close our event write", "err", err)
93 return
94 }
95 }()
96 }
97
98 // we should tell the relay to request a new crawl at this point if we got disconnected
99 // use a new context since the old one might be cancelled at this point
100 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
101 defer cancel()
102 if err := s.requestCrawl(ctx); err != nil {
103 logger.Error("error requesting crawls", "err", err)
104 }
105
106 return nil
107}