diff --git a/real-time-data/kafka.go b/real-time-data/kafka.go index 409f18c..2843630 100644 --- a/real-time-data/kafka.go +++ b/real-time-data/kafka.go @@ -3,6 +3,10 @@ package subscription import ( "context" + "fmt" + "os" + "os/signal" + "syscall" "time" "modelRT/log" @@ -21,6 +25,7 @@ func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, t logger := log.GetLoggerInstance() // setup a channel to listen for interrupt signals + // TODO 将中断信号放到入参中 interrupt := make(chan struct{}, 1) // read message (-1 means wait indefinitely) @@ -63,4 +68,31 @@ func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, t logger.Error("manual submission information failed", zap.Any("message", msg), zap.Error(err)) } } + + consumer.SubscribeTopics(topics, nil) + + // 捕获中断信号以便优雅关闭 + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + + // 消费消息 + for { + select { + case sig := <-signals: + fmt.Printf("Interrupt signal (%s) received, stopping consumers...\n", sig) + return + case ev := <-consumer.Events(): + switch e := ev.(type) { + case kafka.AssignedPartitions: + fmt.Printf("Assigned partitions: %v\n", e.Partitions) + case kafka.RevokedPartitions: + fmt.Printf("Revoked partitions: %v\n", e.Partitions) + case *kafka.Message: + fmt.Printf("Consumed message: %s from %v [%d] at offset %v\n", + string(e.Value), e.TopicPartition.Topic, e.TopicPartition.Partition, e.TopicPartition.Offset) + } + } + } + // var client http.Client + // client.Do() }