modelRT/real-time-data/kafka.go

67 lines
1.7 KiB
Go
Raw Normal View History

// Package subscription define real time data operation functions
package subscription
2024-11-28 11:46:40 +08:00
import (
"context"
"time"
"modelRT/log"
2024-11-28 11:46:40 +08:00
"github.com/confluentinc/confluent-kafka-go/kafka"
"go.uber.org/zap"
2024-11-28 11:46:40 +08:00
)
// RealTimeDataComputer continuously processing real-time data from Kafka specified topics
func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, topics []string, duration string) {
// context for graceful shutdown
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// get a logger
logger := log.GetLoggerInstance()
2024-11-28 11:46:40 +08:00
// setup a channel to listen for interrupt signals
interrupt := make(chan struct{}, 1)
// read message (-1 means wait indefinitely)
timeoutDuration, err := time.ParseDuration(duration)
// create a new consumer
2024-11-28 11:46:40 +08:00
consumer, err := kafka.NewConsumer(&consumerConfig)
if err != nil {
logger.Error("init kafka consume by config failed", zap.Any("config", consumerConfig), zap.Error(err))
2024-11-28 11:46:40 +08:00
}
// subscribe to the topic
2024-11-28 11:46:40 +08:00
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
logger.Error("subscribe to the topic failed", zap.Strings("topic", topics), zap.Error(err))
2024-11-28 11:46:40 +08:00
}
// start a goroutine to handle shutdown
2024-11-28 11:46:40 +08:00
go func() {
<-interrupt
cancel()
consumer.Close()
}()
// continuously read messages from Kafka
2024-11-28 11:46:40 +08:00
for {
msg, err := consumer.ReadMessage(timeoutDuration)
2024-11-28 11:46:40 +08:00
if err != nil {
if ctx.Err() == context.Canceled {
logger.Info("context canceled, stopping read loop")
2024-11-28 11:46:40 +08:00
break
}
logger.Error("consumer read message failed", zap.Error(err))
2024-11-28 11:46:40 +08:00
continue
}
// TODO 使用 ants.pool处理 kafka 的订阅数据
_, err = consumer.CommitMessage(msg)
if err != nil {
logger.Error("manual submission information failed", zap.Any("message", msg), zap.Error(err))
}
2024-11-28 11:46:40 +08:00
}
}