this repo has no description

request crawl when websocket dies

Hailey ee140be0 a4ae9988

Changed files
+67 -16
server
+31 -11
server/handle_sync_subscribe_repos.go
··· 1 package server 2 3 import ( 4 - "fmt" 5 6 "github.com/bluesky-social/indigo/events" 7 "github.com/bluesky-social/indigo/lex/util" ··· 10 ) 11 12 func (s *Server) handleSyncSubscribeRepos(e echo.Context) error { 13 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10) 14 if err != nil { 15 return err 16 } 17 - 18 - s.logger.Info("new connection", "ua", e.Request().UserAgent()) 19 - 20 - ctx := e.Request().Context() 21 22 ident := e.RealIP() + "-" + e.Request().UserAgent() 23 24 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 25 return true ··· 33 for evt := range evts { 34 wc, err := conn.NextWriter(websocket.BinaryMessage) 35 if err != nil { 36 - return err 37 } 38 39 - var obj util.CBOR 40 41 switch { 42 case evt.Error != nil: 43 header.Op = events.EvtKindErrorFrame ··· 55 header.MsgType = "#info" 56 obj = evt.RepoInfo 57 default: 58 - return fmt.Errorf("unrecognized event kind") 59 } 60 61 if err := header.MarshalCBOR(wc); err != nil { 62 - return fmt.Errorf("failed to write header: %w", err) 63 } 64 65 if err := obj.MarshalCBOR(wc); err != nil { 66 - return fmt.Errorf("failed to write event: %w", err) 67 } 68 69 if err := wc.Close(); err != nil { 70 - return fmt.Errorf("failed to flush-close our event write: %w", err) 71 } 72 } 73 74 return nil
··· 1 package server 2 3 import ( 4 + "context" 5 + "time" 6 7 "github.com/bluesky-social/indigo/events" 8 "github.com/bluesky-social/indigo/lex/util" ··· 11 ) 12 13 func (s *Server) handleSyncSubscribeRepos(e echo.Context) error { 14 + ctx := e.Request().Context() 15 + logger := s.logger.With("component", "subscribe-repos-websocket") 16 + 17 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10) 18 if err != nil { 19 + logger.Error("unable to establish websocket with relay", "err", err) 20 return err 21 } 22 23 ident := e.RealIP() + "-" + e.Request().UserAgent() 24 + logger = logger.With("ident", ident) 25 + logger.Info("new connection established") 26 27 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 28 return true ··· 36 for evt := range evts { 37 wc, err := conn.NextWriter(websocket.BinaryMessage) 38 if err != nil { 39 + logger.Error("error writing message to relay", "err", err) 40 + break 41 } 42 43 + if ctx.Err() != nil { 44 + logger.Error("context error", "err", err) 45 + break 46 + } 47 48 + var obj util.CBOR 49 switch { 50 case evt.Error != nil: 51 header.Op = events.EvtKindErrorFrame ··· 63 header.MsgType = "#info" 64 obj = evt.RepoInfo 65 default: 66 + logger.Warn("unrecognized event kind") 67 + return nil 68 } 69 70 if err := header.MarshalCBOR(wc); err != nil { 71 + logger.Error("failed to write header to relay", "err", err) 72 + break 73 } 74 75 if err := obj.MarshalCBOR(wc); err != nil { 76 + logger.Error("failed to write event to relay", "err", err) 77 + break 78 } 79 80 if err := wc.Close(); err != nil { 81 + logger.Error("failed to flush-close our event write", "err", err) 82 + break 83 } 84 + } 85 + 86 + // we should tell the relay to request a new crawl at this point if we got disconnected 87 + // use a new context since the old one might be cancelled at this point 88 + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) 89 + defer cancel() 90 + if err := s.requestCrawl(ctx); err != nil { 91 + logger.Error("error requesting crawls", "err", err) 92 } 93 94 return nil
+36 -5
server/server.go
··· 77 passport *identity.Passport 78 fallbackProxy string 79 80 dbName string 81 s3Config *S3Config 82 } ··· 518 519 go s.backupRoutine() 520 521 for _, relay := range s.config.Relays { 522 cli := xrpc.Client{Host: relay} 523 - atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{ 524 Hostname: s.config.Hostname, 525 - }) 526 } 527 528 - <-ctx.Done() 529 - 530 - fmt.Println("shut down") 531 532 return nil 533 }
··· 77 passport *identity.Passport 78 fallbackProxy string 79 80 + lastRequestCrawl time.Time 81 + requestCrawlMu sync.Mutex 82 + 83 dbName string 84 s3Config *S3Config 85 } ··· 521 522 go s.backupRoutine() 523 524 + go func() { 525 + if err := s.requestCrawl(ctx); err != nil { 526 + s.logger.Error("error requesting crawls", "err", err) 527 + } 528 + }() 529 + 530 + <-ctx.Done() 531 + 532 + fmt.Println("shut down") 533 + 534 + return nil 535 + } 536 + 537 + func (s *Server) requestCrawl(ctx context.Context) error { 538 + logger := s.logger.With("component", "request-crawl") 539 + s.requestCrawlMu.Lock() 540 + defer s.requestCrawlMu.Unlock() 541 + 542 + logger.Info("requesting crawl with configured relays") 543 + 544 + if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute { 545 + return fmt.Errorf("a crawl request has already been made within the last minute") 546 + } 547 + 548 for _, relay := range s.config.Relays { 549 + logger := logger.With("relay", relay) 550 + logger.Info("requesting crawl from relay") 551 cli := xrpc.Client{Host: relay} 552 + if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{ 553 Hostname: s.config.Hostname, 554 + }); err != nil { 555 + logger.Error("error requesting crawl", "err", err) 556 + } else { 557 + logger.Info("crawl requested successfully") 558 + } 559 } 560 561 + s.lastRequestCrawl = time.Now() 562 563 return nil 564 }