modelRT/real-time-data/kafka.go

76 lines
1.9 KiB
Go
Raw Normal View History

2024-11-28 11:46:40 +08:00
// Package readltimedata define real time data operation functions
package readltimedata
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// RealTimeDataComputer continuously processing real-time data from Kafka specified topics
func RealTimeDataComputer(consumerConfig kafka.ConfigMap) {
ctx := context.TODO()
// Create a new consumer
consumer, err := kafka.NewConsumer(&consumerConfig)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
// Subscribe to the topic
topics := []string{"my-topic"}
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
log.Fatalf("Failed to subscribe to topics: %v", err)
}
// Setup a channel to listen for interrupt signals
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
// Context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start a goroutine to handle shutdown
go func() {
<-interrupt
log.Println("Interrupt signal received, stopping consumer...")
cancel()
consumer.Close()
}()
// Continuously read messages from Kafka
for {
// Read message (-1 means wait indefinitely)
duration := time.Duration(1 * time.Second)
msg, err := consumer.ReadMessage(duration)
if err != nil {
// Handle errors (e.g., context canceled on interrupt)
if ctx.Err() == context.Canceled {
log.Println("Context canceled, stopping read loop")
break
}
log.Printf("Consumer error: %v (%v)\n", err, msg)
continue
}
// Print message to stdout
// fmt.Printf("Received message: %s from %s [%d] at %v\n",
// msg.Value(), msg.TopicPartition(), msg.Partition(), msg.Timestamp())
// // Commit the offset manually
// err = consumer.CommitMessage(msg)
// if err != nil {
// log.Printf("Failed to commit message: %v", err)
// }
}
log.Println("Consumer stopped")
}