// Package realtimedata define real time data operation functions package realtimedata import ( "context" "time" "modelRT/logger" "github.com/confluentinc/confluent-kafka-go/kafka" ) // RealTimeDataComputer continuously processing real-time data from Kafka specified topics func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, topics []string, duration string) { // context for graceful shutdown ctx, cancel := context.WithCancel(ctx) defer cancel() // setup a channel to listen for interrupt signals // TODO 将中断信号放到入参中 interrupt := make(chan struct{}, 1) // read message (-1 means wait indefinitely) timeoutDuration, err := time.ParseDuration(duration) // create a new consumer consumer, err := kafka.NewConsumer(&consumerConfig) if err != nil { logger.Error(ctx, "init kafka consume by config failed", "config", consumerConfig, "error", err) } // subscribe to the topic err = consumer.SubscribeTopics(topics, nil) if err != nil { logger.Error(ctx, "subscribe to the topic failed", "topic", topics, "error", err) } // start a goroutine to handle shutdown go func() { <-interrupt cancel() consumer.Close() }() // continuously read messages from Kafka for { msg, err := consumer.ReadMessage(timeoutDuration) if err != nil { if ctx.Err() == context.Canceled { logger.Info(ctx, "context canceled, stopping read loop") break } logger.Error(ctx, "consumer read message failed", "error", err) continue } // TODO 使用 ants.pool处理 kafka 的订阅数据 _, err = consumer.CommitMessage(msg) if err != nil { logger.Error(ctx, "manual submission information failed", "message", msg, "error", err) } } }