modelRT/real-time-data/kafka.go

64 lines
1.7 KiB
Go

// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"time"
"modelRT/logger"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// RealTimeDataComputer continuously processing real-time data from Kafka specified topics
func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, topics []string, duration string) {
// context for graceful shutdown
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// setup a channel to listen for interrupt signals
// TODO 将中断信号放到入参中
interrupt := make(chan struct{}, 1)
// read message (-1 means wait indefinitely)
timeoutDuration, err := time.ParseDuration(duration)
// create a new consumer
consumer, err := kafka.NewConsumer(&consumerConfig)
if err != nil {
logger.Error(ctx, "init kafka consume by config failed", "config", consumerConfig, "error", err)
}
// subscribe to the topic
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
logger.Error(ctx, "subscribe to the topic failed", "topic", topics, "error", err)
}
// start a goroutine to handle shutdown
go func() {
<-interrupt
cancel()
consumer.Close()
}()
// continuously read messages from Kafka
for {
msg, err := consumer.ReadMessage(timeoutDuration)
if err != nil {
if ctx.Err() == context.Canceled {
logger.Info(ctx, "context canceled, stopping read loop")
break
}
logger.Error(ctx, "consumer read message failed", "error", err)
continue
}
// TODO 使用 ants.pool处理 kafka 的订阅数据
_, err = consumer.CommitMessage(msg)
if err != nil {
logger.Error(ctx, "manual submission information failed", "message", msg, "error", err)
}
}
}