this repo has no description
at main 118 lines 2.4 kB view raw
1package indexer 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "os" 8 "os/signal" 9 "syscall" 10 "time" 11 12 "github.com/bluesky-social/go-util/pkg/bus/consumer" 13 vyletkafka "github.com/vylet-app/go/bus/proto" 14 "github.com/vylet-app/go/database/client" 15) 16 17type Server struct { 18 logger *slog.Logger 19 20 consumer *consumer.Consumer[*vyletkafka.FirehoseEvent] 21 db *client.Client 22} 23 24type Args struct { 25 Logger *slog.Logger 26 27 BootstrapServers []string 28 InputTopic string 29 ConsumerGroup string 30 31 DatabaseHost string 32} 33 34func New(args *Args) (*Server, error) { 35 if args.Logger == nil { 36 args.Logger = slog.Default() 37 } 38 39 logger := args.Logger 40 41 db, err := client.New(&client.Args{ 42 Addr: args.DatabaseHost, 43 }) 44 if err != nil { 45 return nil, fmt.Errorf("failed to create a new database client: %w", err) 46 } 47 48 server := Server{ 49 logger: logger, 50 51 db: db, 52 } 53 54 busConsumer, err := consumer.New( 55 logger.With("component", "consumer"), 56 args.BootstrapServers, 57 args.InputTopic, 58 args.ConsumerGroup, 59 consumer.WithOffset[*vyletkafka.FirehoseEvent](consumer.OffsetStart), 60 consumer.WithMessageHandler(server.handleEvent), 61 ) 62 if err != nil { 63 return nil, fmt.Errorf("failed to create new consumer: %w", err) 64 } 65 server.consumer = busConsumer 66 67 return &server, nil 68} 69 70func (s *Server) Run(ctx context.Context) error { 71 logger := s.logger.With("name", "Run") 72 73 shutdownConsumer := make(chan struct{}, 1) 74 consumerShutdown := make(chan struct{}, 1) 75 consumerErr := make(chan error, 1) 76 go func() { 77 go func() { 78 if err := s.consumer.Consume(ctx); err != nil { 79 consumerErr <- err 80 } 81 }() 82 83 select { 84 case <-shutdownConsumer: 85 case err := <-consumerErr: 86 s.logger.Error("error consuming", "err", err) 87 } 88 89 s.consumer.Close() 90 91 close(consumerShutdown) 92 }() 93 94 signals := make(chan os.Signal, 1) 95 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 96 97 select { 98 case sig := <-signals: 99 logger.Info("received exit signal", "signal", sig) 100 close(shutdownConsumer) 101 case <-ctx.Done(): 102 logger.Info("context cancelled") 103 close(shutdownConsumer) 104 case <-consumerShutdown: 105 logger.Warn("consumer shut down unexpectedly") 106 } 107 108 _, cancel := context.WithTimeout(context.Background(), 10*time.Second) 109 defer cancel() 110 111 s.consumer.Close() 112 113 if err := s.db.Close(); err != nil { 114 logger.Error("failed to close database client", "err", err) 115 } 116 117 return nil 118}