fix(inputs.kafka): switch to sarama's new consumer group rebalance strategy setting (#12064)
This commit is contained in:
parent
89cdfa35f7
commit
a6352d9794
|
|
@ -112,11 +112,11 @@ func (k *KafkaConsumer) Init() error {
|
||||||
|
|
||||||
switch strings.ToLower(k.BalanceStrategy) {
|
switch strings.ToLower(k.BalanceStrategy) {
|
||||||
case "range", "":
|
case "range", "":
|
||||||
cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
|
cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
|
||||||
case "roundrobin":
|
case "roundrobin":
|
||||||
cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
|
cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
|
||||||
case "sticky":
|
case "sticky":
|
||||||
cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
|
cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("invalid balance strategy %q", k.BalanceStrategy)
|
return fmt.Errorf("invalid balance strategy %q", k.BalanceStrategy)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue