dataRT/config/kafka.go

214 lines
5.3 KiB
Go

package config
type kafkaConsumer struct {
GroupID string `json:"groupid" yaml:"groupid"`
// OffsetNewest, -1, stands for the log head offset, i.e. the offset that will be
// assigned to the next message that will be produced to the partition. You
// can send this to a client's GetOffset method to get this offset, or when
// calling ConsumePartition to start consuming new messages.
//
// OffsetOldest, -2, stands for the oldest offset available on the broker for a
// partition. You can send this to a client's GetOffset method to get this
// offset, or when calling ConsumePartition to start consuming from the
// oldest offset that is still available on the broker.
InitialOffset int64 `json:"initialoffset" yaml:"initialoffset"`
}
type kafkaProducer struct {
// NoResponse, 0, doesn't send any response, the TCP ACK is all you get.
//
// WaitForLocal, 1, waits for only the local commit to succeed before responding.
//
// WaitForAll, -1, waits for all in-sync replicas to commit before responding.
// The minimum number of in-sync replicas is configured on the broker via
// the `min.insync.replicas` configuration key.
RequiredAcks int16 `json:"requiredacks" yaml:"requiredacks"`
// manual/random/roundrobin/customhash/hash/referencehash/consistentcrchash
Partitioner string `json:"partitioner" yaml:"partitioner"`
ReturnSuccesses bool `json:"returnsuccesses" yaml:"returnsuccesses"`
ReturnErrors bool `json:"returnerrors" yaml:"returnerrors"`
// CompressionNone, 0, no compression
// CompressionGZIP, 1, compression using GZIP
// CompressionSnappy, 2, compression using snappy
// CompressionLZ4, 3, compression using LZ4
// CompressionZSTD, 4, compression using ZSTD
Compression int8 `json:"compression" yaml:"compression"`
}
type kafkaConfig struct {
Brokers []string `json:"brokers" yaml:"brokers"`
Topics []string `json:"topics" yaml:"topics"`
Consumer *kafkaConsumer `json:"consumer" yaml:"consumer"`
Producer *kafkaProducer `json:"producer" yaml:"producer"`
}
func NewKafkaConfig() *kafkaConfig {
return new(kafkaConfig)
}
func (conf *kafkaConfig) GetBrokers() []string {
if conf == nil {
panic("kafka config is nil")
}
return conf.Brokers
}
func (conf *kafkaConfig) SetBrokers(brokers []string) {
if conf == nil {
panic("kafka config is nil")
}
conf.Brokers = brokers
}
func (conf *kafkaConfig) GetTopics() []string {
if conf == nil {
panic("kafka config is nil")
}
return conf.Topics
}
func (conf *kafkaConfig) SetTopics(topics []string) {
if conf == nil {
panic("kafka config is nil")
}
conf.Topics = topics
}
func (conf *kafkaConfig) GetConsumer() *kafkaConsumer {
if conf == nil {
panic("kafka config is nil")
}
return conf.Consumer
}
func (conf *kafkaConfig) InitConsumer() *kafkaConsumer {
if conf == nil {
panic("kafka config is nil")
}
conf.Consumer = new(kafkaConsumer)
return conf.Consumer
}
func (conf *kafkaConsumer) GetGroupID() string {
if conf == nil {
panic("kafka consumer is nil")
}
return conf.GroupID
}
func (conf *kafkaConsumer) SetGroupID(groupID string) {
if conf == nil {
panic("kafka consumer is nil")
}
conf.GroupID = groupID
}
func (conf *kafkaConsumer) GetInitialOffset() int64 {
if conf == nil {
panic("kafka consumer is nil")
}
return conf.InitialOffset
}
func (conf *kafkaConsumer) SetInitialOffset(initialOffset int64) {
if conf == nil {
panic("kafka consumer is nil")
}
if initialOffset != -1 && initialOffset != -2 {
panic("initialOffset is invalid")
}
conf.InitialOffset = initialOffset
}
func (conf *kafkaConfig) GetProducer() *kafkaProducer {
if conf == nil {
panic("kafka config is nil")
}
return conf.Producer
}
func (conf *kafkaConfig) InitProducer() *kafkaProducer {
if conf == nil {
panic("kafka config is nil")
}
conf.Producer = new(kafkaProducer)
return conf.Producer
}
func (conf *kafkaProducer) GetRequiredAcks() int16 {
if conf == nil {
panic("kafka producer is nil")
}
return conf.RequiredAcks
}
func (conf *kafkaProducer) SetRequiredAcks(requiredAcks int16) {
if conf == nil {
panic("kafka producer is nil")
}
if requiredAcks < -1 || requiredAcks > 1 {
panic("requiredAcks is invalid")
}
conf.RequiredAcks = requiredAcks
}
func (conf *kafkaProducer) GetPartitioner() string {
if conf == nil {
panic("kafka producer is nil")
}
return conf.Partitioner
}
func (conf *kafkaProducer) SetPartitioner(partitioner string) {
if conf == nil {
panic("kafka producer is nil")
}
conf.Partitioner = partitioner
}
func (conf *kafkaProducer) GetReturnSuccesses() bool {
if conf == nil {
panic("kafka producer is nil")
}
return conf.ReturnSuccesses
}
func (conf *kafkaProducer) SetReturnSuccesses(returnSuccesses bool) {
if conf == nil {
panic("kafka producer is nil")
}
conf.ReturnSuccesses = returnSuccesses
}
func (conf *kafkaProducer) GetReturnErrors() bool {
if conf == nil {
panic("kafka producer is nil")
}
return conf.ReturnErrors
}
func (conf *kafkaProducer) SetReturnErrors(returnErrors bool) {
if conf == nil {
panic("kafka producer is nil")
}
conf.ReturnErrors = returnErrors
}
func (conf *kafkaProducer) GetCompression() int8 {
if conf == nil {
panic("kafka producer is nil")
}
return conf.Compression
}
func (conf *kafkaProducer) SetCompression(compression int8) {
if conf == nil {
panic("kafka producer is nil")
}
conf.Compression = compression
}
func kafkaConfigName() string {
return "kafka.json"
}