modelRT/real-time-data/kafka.go

64 lines
1.7 KiB
Go
Raw Normal View History

2024-12-18 16:25:49 +08:00
// Package realtimedata define real time data operation functions
package realtimedata
2024-11-28 11:46:40 +08:00
import (
"context"
"time"
2024-12-25 16:34:57 +08:00
"modelRT/logger"
2024-11-28 11:46:40 +08:00
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// RealTimeDataComputer continuously processing real-time data from Kafka specified topics
func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, topics []string, duration string) {
// context for graceful shutdown
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// setup a channel to listen for interrupt signals
2024-12-16 15:37:44 +08:00
// TODO 将中断信号放到入参中
interrupt := make(chan struct{}, 1)
// read message (-1 means wait indefinitely)
timeoutDuration, err := time.ParseDuration(duration)
// create a new consumer
2024-11-28 11:46:40 +08:00
consumer, err := kafka.NewConsumer(&consumerConfig)
if err != nil {
logger.Error(ctx, "init kafka consume by config failed", "config", consumerConfig, "error", err)
2024-11-28 11:46:40 +08:00
}
// subscribe to the topic
2024-11-28 11:46:40 +08:00
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
logger.Error(ctx, "subscribe to the topic failed", "topic", topics, "error", err)
2024-11-28 11:46:40 +08:00
}
// start a goroutine to handle shutdown
2024-11-28 11:46:40 +08:00
go func() {
<-interrupt
cancel()
consumer.Close()
}()
// continuously read messages from Kafka
2024-11-28 11:46:40 +08:00
for {
msg, err := consumer.ReadMessage(timeoutDuration)
2024-11-28 11:46:40 +08:00
if err != nil {
if ctx.Err() == context.Canceled {
logger.Info(ctx, "context canceled, stopping read loop")
2024-11-28 11:46:40 +08:00
break
}
logger.Error(ctx, "consumer read message failed", "error", err)
2024-11-28 11:46:40 +08:00
continue
}
// TODO 使用 ants.pool处理 kafka 的订阅数据
_, err = consumer.CommitMessage(msg)
if err != nil {
logger.Error(ctx, "manual submission information failed", "message", msg, "error", err)
}
2024-11-28 11:46:40 +08:00
}
}