diff --git a/.drone.yml b/.drone.yml index 344cd56..97a6cda 100644 --- a/.drone.yml +++ b/.drone.yml @@ -9,4 +9,4 @@ steps: GO111MODULE: on GOPROXY: https://goproxy.cn,direct commands: - - go build + - go build main.go diff --git a/real-time-data/kafka.go b/real-time-data/kafka.go index c6a6551..1cfa31f 100644 --- a/real-time-data/kafka.go +++ b/real-time-data/kafka.go @@ -3,10 +3,6 @@ package realtimedata import ( "context" - "fmt" - "os" - "os/signal" - "syscall" "time" "modelRT/logger" @@ -68,31 +64,4 @@ func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, t logger.Error("manual submission information failed", zap.Any("message", msg), zap.Error(err)) } } - - consumer.SubscribeTopics(topics, nil) - - // 捕获中断信号以便优雅关闭 - signals := make(chan os.Signal, 1) - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - - // 消费消息 - for { - select { - case sig := <-signals: - fmt.Printf("Interrupt signal (%s) received, stopping consumers...\n", sig) - return - case ev := <-consumer.Events(): - switch e := ev.(type) { - case kafka.AssignedPartitions: - fmt.Printf("Assigned partitions: %v\n", e.Partitions) - case kafka.RevokedPartitions: - fmt.Printf("Revoked partitions: %v\n", e.Partitions) - case *kafka.Message: - fmt.Printf("Consumed message: %s from %v [%d] at offset %v\n", - string(e.Value), e.TopicPartition.Topic, e.TopicPartition.Partition, e.TopicPartition.Offset) - } - } - } - // var client http.Client - // client.Do() }