init modelRT show demo

This commit is contained in:
douxu 2024-12-16 15:37:44 +08:00
parent 375655017e
commit a611c08c20
1 changed files with 32 additions and 0 deletions

View File

@ -3,6 +3,10 @@ package subscription
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"modelRT/log"
@ -21,6 +25,7 @@ func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, t
logger := log.GetLoggerInstance()
// setup a channel to listen for interrupt signals
// TODO 将中断信号放到入参中
interrupt := make(chan struct{}, 1)
// read message (-1 means wait indefinitely)
@ -63,4 +68,31 @@ func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, t
logger.Error("manual submission information failed", zap.Any("message", msg), zap.Error(err))
}
}
consumer.SubscribeTopics(topics, nil)
// 捕获中断信号以便优雅关闭
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
// 消费消息
for {
select {
case sig := <-signals:
fmt.Printf("Interrupt signal (%s) received, stopping consumers...\n", sig)
return
case ev := <-consumer.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
fmt.Printf("Assigned partitions: %v\n", e.Partitions)
case kafka.RevokedPartitions:
fmt.Printf("Revoked partitions: %v\n", e.Partitions)
case *kafka.Message:
fmt.Printf("Consumed message: %s from %v [%d] at offset %v\n",
string(e.Value), e.TopicPartition.Topic, e.TopicPartition.Partition, e.TopicPartition.Offset)
}
}
}
// var client http.Client
// client.Do()
}