tangled
alpha
login
or
join now
hailey.at
/
at-kafka
3
fork
atom
this repo has no description
3
fork
atom
overview
issues
pulls
pipelines
simplify producerl ogic
hailey.at
3 months ago
931e3edd
e632490a
+32
-45
1 changed file
expand all
collapse all
unified
split
atkafka
atkafka.go
+32
-45
atkafka/atkafka.go
···
32
ospreyCompat bool
33
logger *slog.Logger
34
35
-
atkProducer *Producer
36
-
ospProducer *Producer
37
38
plcClient *PlcClient
39
}
···
70
}
71
72
func (s *Server) Run(ctx context.Context) error {
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
73
wsDialer := websocket.DefaultDialer
74
u, err := url.Parse(s.relayHost)
75
if err != nil {
76
return fmt.Errorf("invalid relayHost: %w", err)
77
}
78
u.Path = "/xrpc/com.atproto.sync.subscribeRepos"
0
79
80
wsErr := make(chan error, 1)
81
shutdownWs := make(chan struct{}, 1)
···
109
110
wsErr <- nil
111
}()
112
-
113
-
producerLogger := s.logger.With("component", "producer")
114
-
if s.ospreyCompat {
115
-
kafProducer, err := NewProducer(ctx, producerLogger, s.bootstrapServers, s.outputTopic,
116
-
WithEnsureTopic(true),
117
-
WithTopicPartitions(200),
118
-
)
119
-
if err != nil {
120
-
return fmt.Errorf("failed to create producer: %w", err)
121
-
}
122
-
defer kafProducer.Close()
123
-
s.ospProducer = kafProducer
124
-
} else {
125
-
kafProducer, err := NewProducer(ctx, producerLogger, s.bootstrapServers, s.outputTopic,
126
-
WithEnsureTopic(true),
127
-
WithTopicPartitions(200),
128
-
)
129
-
if err != nil {
130
-
return fmt.Errorf("failed to create producer: %w", err)
131
-
}
132
-
defer kafProducer.Close()
133
-
s.atkProducer = kafProducer
134
-
}
135
136
signals := make(chan os.Signal, 1)
137
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
···
432
}
433
434
func (s *Server) produceAsync(ctx context.Context, key string, msg []byte) error {
435
-
if !s.ospreyCompat && s.atkProducer != nil {
436
-
if err := s.atkProducer.ProduceAsync(ctx, key, msg, func(r *kgo.Record, err error) {
437
-
status := "ok"
438
-
if err != nil {
439
-
status = "error"
440
-
s.logger.Error("error producing message", "err", err)
441
-
}
442
-
producedEvents.WithLabelValues(status).Inc()
443
-
}); err != nil {
0
0
444
return fmt.Errorf("failed to produce message: %w", err)
445
}
446
-
} else if s.ospreyCompat && s.ospProducer != nil {
447
-
if err := s.ospProducer.ProduceAsync(ctx, key, msg, func(r *kgo.Record, err error) {
448
-
status := "ok"
449
-
if err != nil {
450
-
status = "error"
451
-
s.logger.Error("error producing message", "err", err)
452
-
}
453
-
producedEvents.WithLabelValues(status).Inc()
454
-
}); err != nil {
455
return fmt.Errorf("failed to produce message: %w", err)
456
}
457
-
} else {
458
-
return fmt.Errorf("failed to produce message. no compatible producer registered.")
459
}
460
461
return nil
···
32
ospreyCompat bool
33
logger *slog.Logger
34
35
+
producer *Producer
0
36
37
plcClient *PlcClient
38
}
···
69
}
70
71
func (s *Server) Run(ctx context.Context) error {
72
+
s.logger.Info("starting consumer", "relay-host", s.relayHost, "bootstrap-servers", s.bootstrapServers, "output-topic", s.outputTopic)
73
+
74
+
createCtx, _ := context.WithTimeout(ctx, time.Second*5)
75
+
76
+
producerLogger := s.logger.With("component", "producer")
77
+
kafProducer, err := NewProducer(createCtx, producerLogger, s.bootstrapServers, s.outputTopic,
78
+
WithEnsureTopic(true),
79
+
WithTopicPartitions(200),
80
+
)
81
+
if err != nil {
82
+
return fmt.Errorf("failed to create producer: %w", err)
83
+
}
84
+
defer kafProducer.Close()
85
+
s.producer = kafProducer
86
+
s.logger.Info("created producer")
87
+
88
wsDialer := websocket.DefaultDialer
89
u, err := url.Parse(s.relayHost)
90
if err != nil {
91
return fmt.Errorf("invalid relayHost: %w", err)
92
}
93
u.Path = "/xrpc/com.atproto.sync.subscribeRepos"
94
+
s.logger.Info("created dialer")
95
96
wsErr := make(chan error, 1)
97
shutdownWs := make(chan struct{}, 1)
···
125
126
wsErr <- nil
127
}()
128
+
s.logger.Info("created relay consumer")
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
129
130
signals := make(chan os.Signal, 1)
131
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
···
426
}
427
428
func (s *Server) produceAsync(ctx context.Context, key string, msg []byte) error {
429
+
callback := func(r *kgo.Record, err error) {
430
+
status := "ok"
431
+
if err != nil {
432
+
status = "error"
433
+
s.logger.Error("error producing message", "err", err)
434
+
}
435
+
producedEvents.WithLabelValues(status).Inc()
436
+
}
437
+
438
+
if !s.ospreyCompat {
439
+
if err := s.producer.ProduceAsync(ctx, key, msg, callback); err != nil {
440
return fmt.Errorf("failed to produce message: %w", err)
441
}
442
+
} else if s.ospreyCompat {
443
+
if err := s.producer.ProduceAsync(ctx, key, msg, callback); err != nil {
0
0
0
0
0
0
0
444
return fmt.Errorf("failed to produce message: %w", err)
445
}
0
0
446
}
447
448
return nil