this repo has no description
at v0.7.2 2.8 kB view raw
1package server 2 3import ( 4 "context" 5 "time" 6 7 "github.com/bluesky-social/indigo/events" 8 "github.com/bluesky-social/indigo/lex/util" 9 "github.com/btcsuite/websocket" 10 "github.com/haileyok/cocoon/metrics" 11 "github.com/labstack/echo/v4" 12) 13 14func (s *Server) handleSyncSubscribeRepos(e echo.Context) error { 15 ctx := e.Request().Context() 16 logger := s.logger.With("component", "subscribe-repos-websocket") 17 18 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10) 19 if err != nil { 20 logger.Error("unable to establish websocket with relay", "err", err) 21 return err 22 } 23 24 ident := e.RealIP() + "-" + e.Request().UserAgent() 25 logger = logger.With("ident", ident) 26 logger.Info("new connection established") 27 28 metrics.RelaysConnected.WithLabelValues(ident).Inc() 29 defer func() { 30 metrics.RelaysConnected.WithLabelValues(ident).Dec() 31 }() 32 33 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 34 return true 35 }, nil) 36 if err != nil { 37 return err 38 } 39 defer cancel() 40 41 header := events.EventHeader{Op: events.EvtKindMessage} 42 for evt := range evts { 43 func() { 44 defer func() { 45 metrics.RelaySends.WithLabelValues(ident, header.MsgType).Inc() 46 }() 47 48 wc, err := conn.NextWriter(websocket.BinaryMessage) 49 if err != nil { 50 logger.Error("error writing message to relay", "err", err) 51 return 52 } 53 54 if ctx.Err() != nil { 55 logger.Error("context error", "err", err) 56 return 57 } 58 59 var obj util.CBOR 60 switch { 61 case evt.Error != nil: 62 header.Op = events.EvtKindErrorFrame 63 obj = evt.Error 64 case evt.RepoCommit != nil: 65 header.MsgType = "#commit" 66 obj = evt.RepoCommit 67 case evt.RepoIdentity != nil: 68 header.MsgType = "#identity" 69 obj = evt.RepoIdentity 70 case evt.RepoAccount != nil: 71 header.MsgType = "#account" 72 obj = evt.RepoAccount 73 case evt.RepoInfo != nil: 74 header.MsgType = "#info" 75 obj = evt.RepoInfo 76 default: 77 logger.Warn("unrecognized event kind") 78 return 79 } 80 81 if err := header.MarshalCBOR(wc); err != nil { 82 logger.Error("failed to write header to relay", "err", err) 83 return 84 } 85 86 if err := obj.MarshalCBOR(wc); err != nil { 87 logger.Error("failed to write event to relay", "err", err) 88 return 89 } 90 91 if err := wc.Close(); err != nil { 92 logger.Error("failed to flush-close our event write", "err", err) 93 return 94 } 95 }() 96 } 97 98 // we should tell the relay to request a new crawl at this point if we got disconnected 99 // use a new context since the old one might be cancelled at this point 100 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) 101 defer cancel() 102 if err := s.requestCrawl(ctx); err != nil { 103 logger.Error("error requesting crawls", "err", err) 104 } 105 106 return nil 107}