modelRT/real-time-data/kafka.go

68 lines
1.8 KiB
Go

// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"time"
"modelRT/logger"
"github.com/confluentinc/confluent-kafka-go/kafka"
"go.uber.org/zap"
)
// RealTimeDataComputer continuously processing real-time data from Kafka specified topics
func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, topics []string, duration string) {
// context for graceful shutdown
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// get a logger
logger := logger.GetLoggerInstance()
// setup a channel to listen for interrupt signals
// TODO 将中断信号放到入参中
interrupt := make(chan struct{}, 1)
// read message (-1 means wait indefinitely)
timeoutDuration, err := time.ParseDuration(duration)
// create a new consumer
consumer, err := kafka.NewConsumer(&consumerConfig)
if err != nil {
logger.Error("init kafka consume by config failed", zap.Any("config", consumerConfig), zap.Error(err))
}
// subscribe to the topic
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
logger.Error("subscribe to the topic failed", zap.Strings("topic", topics), zap.Error(err))
}
// start a goroutine to handle shutdown
go func() {
<-interrupt
cancel()
consumer.Close()
}()
// continuously read messages from Kafka
for {
msg, err := consumer.ReadMessage(timeoutDuration)
if err != nil {
if ctx.Err() == context.Canceled {
logger.Info("context canceled, stopping read loop")
break
}
logger.Error("consumer read message failed", zap.Error(err))
continue
}
// TODO 使用 ants.pool处理 kafka 的订阅数据
_, err = consumer.CommitMessage(msg)
if err != nil {
logger.Error("manual submission information failed", zap.Any("message", msg), zap.Error(err))
}
}
}