this repo has no description
1package indexer
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os"
8 "os/signal"
9 "syscall"
10 "time"
11
12 "github.com/bluesky-social/go-util/pkg/bus/consumer"
13 vyletkafka "github.com/vylet-app/go/bus/proto"
14 "github.com/vylet-app/go/database/client"
15)
16
17type Server struct {
18 logger *slog.Logger
19
20 consumer *consumer.Consumer[*vyletkafka.FirehoseEvent]
21 db *client.Client
22}
23
24type Args struct {
25 Logger *slog.Logger
26
27 BootstrapServers []string
28 InputTopic string
29 ConsumerGroup string
30
31 DatabaseHost string
32}
33
34func New(args *Args) (*Server, error) {
35 if args.Logger == nil {
36 args.Logger = slog.Default()
37 }
38
39 logger := args.Logger
40
41 db, err := client.New(&client.Args{
42 Addr: args.DatabaseHost,
43 })
44 if err != nil {
45 return nil, fmt.Errorf("failed to create a new database client: %w", err)
46 }
47
48 server := Server{
49 logger: logger,
50
51 db: db,
52 }
53
54 busConsumer, err := consumer.New(
55 logger.With("component", "consumer"),
56 args.BootstrapServers,
57 args.InputTopic,
58 args.ConsumerGroup,
59 consumer.WithOffset[*vyletkafka.FirehoseEvent](consumer.OffsetStart),
60 consumer.WithMessageHandler(server.handleEvent),
61 )
62 if err != nil {
63 return nil, fmt.Errorf("failed to create new consumer: %w", err)
64 }
65 server.consumer = busConsumer
66
67 return &server, nil
68}
69
70func (s *Server) Run(ctx context.Context) error {
71 logger := s.logger.With("name", "Run")
72
73 shutdownConsumer := make(chan struct{}, 1)
74 consumerShutdown := make(chan struct{}, 1)
75 consumerErr := make(chan error, 1)
76 go func() {
77 go func() {
78 if err := s.consumer.Consume(ctx); err != nil {
79 consumerErr <- err
80 }
81 }()
82
83 select {
84 case <-shutdownConsumer:
85 case err := <-consumerErr:
86 s.logger.Error("error consuming", "err", err)
87 }
88
89 s.consumer.Close()
90
91 close(consumerShutdown)
92 }()
93
94 signals := make(chan os.Signal, 1)
95 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
96
97 select {
98 case sig := <-signals:
99 logger.Info("received exit signal", "signal", sig)
100 close(shutdownConsumer)
101 case <-ctx.Done():
102 logger.Info("context cancelled")
103 close(shutdownConsumer)
104 case <-consumerShutdown:
105 logger.Warn("consumer shut down unexpectedly")
106 }
107
108 _, cancel := context.WithTimeout(context.Background(), 10*time.Second)
109 defer cancel()
110
111 s.consumer.Close()
112
113 if err := s.db.Close(); err != nil {
114 logger.Error("failed to close database client", "err", err)
115 }
116
117 return nil
118}