Skip to content

Commit

Permalink
Differentiate producer/consumer configuration (#16)
Browse files Browse the repository at this point in the history
The producer/consumer code was sharing the same configuration, which
means the producer was being initialized as a consumer as well. If the
producer starts first, it joins the same group and is assigned the only
partition for the topic. When the consumer eventually comes online, it
joins the group but is not assigned the partition, appearing to be
stuck. This patch bifurcates the configuration on whether or not the
Kafka client being stood up should be a consumer. Also, the PR adds a
few hygienic options that should always be set for consumers (read
committed, require stable offsets).
  • Loading branch information
rodaine authored Oct 7, 2024
1 parent 90309fd commit 40a12bb
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cmd/bufstream-demo-consume/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func main() {
}

func run(ctx context.Context, config app.Config) error {
client, err := kafka.NewKafkaClient(config.Kafka)
client, err := kafka.NewKafkaClient(config.Kafka, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/bufstream-demo-produce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
}

func run(ctx context.Context, config app.Config) error {
client, err := kafka.NewKafkaClient(config.Kafka)
client, err := kafka.NewKafkaClient(config.Kafka, false)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@ type Config struct {
}

// NewKafkaClient returns a new franz-go Kafka Client for the given Config.
func NewKafkaClient(config Config) (*kgo.Client, error) {
func NewKafkaClient(config Config, consumer bool) (*kgo.Client, error) {
opts := []kgo.Opt{
kgo.SeedBrokers(config.BootstrapServers...),
kgo.ConsumerGroup(config.Group),
kgo.ConsumeTopics(config.Topic),
kgo.ClientID(config.ClientID),
kgo.AllowAutoTopicCreation(),
kgo.FetchMaxWait(time.Second),
}

if consumer {
opts = append(opts,
kgo.ConsumerGroup(config.Group),
kgo.ConsumeTopics(config.Topic),
kgo.FetchMaxWait(time.Second),
kgo.FetchIsolationLevel(kgo.ReadCommitted()),
kgo.RequireStableFetchOffsets(),
)
}

if config.RootCAPath != "" {
Expand Down

0 comments on commit 40a12bb

Please sign in to comment.