initialize real-time data reading framework

This commit is contained in:
douxu 2024-11-28 15:29:34 +08:00
parent 65e2969ffb
commit 16220a6dd7
8 changed files with 56 additions and 63 deletions

View File

@ -16,11 +16,12 @@ type BaseConfig struct {
// KafkaConfig define config stuct of kafka config
type KafkaConfig struct {
Servers string `mapstructure:"Servers"`
GroupID string `mapstructure:"group_id"`
Topic string `mapstructure:"topic"`
AutoOffsetReset string `mapstructure:"auto_offset_reset"`
EnableAutoCommit string `mapstructure:"enable_auto_commit"`
Servers string `mapstructure:"Servers"`
GroupID string `mapstructure:"group_id"`
Topic string `mapstructure:"topic"`
AutoOffsetReset string `mapstructure:"auto_offset_reset"`
EnableAutoCommit string `mapstructure:"enable_auto_commit"`
ReadMessageTimeDuration string `mapstructure:"read_message_time_duration"`
}
// PostgresConfig define config stuct of postgres config

View File

@ -12,6 +12,7 @@ kafka:
topic: ""
auto_offset_reset: "earliest"
enable_auto_commit: "false"
read_message_time_duration: ”0.5s"
# influxdb:
# host: "localhost"

View File

@ -16,16 +16,16 @@ var (
_globalPostgresMu sync.RWMutex
)
// PostgresDBClient returns the global PostgresDB client.It's safe for concurrent use.
func PostgresDBClient() *gorm.DB {
// GetPostgresDBClient returns the global PostgresDB client.It's safe for concurrent use.
func GetPostgresDBClient() *gorm.DB {
_globalPostgresMu.RLock()
client := _globalPostgresClient
_globalPostgresMu.RUnlock()
return client
}
// GetPostgresDBInstance return instance of PostgresDB client
func GetPostgresDBInstance(ctx context.Context, PostgresDBURI string) *gorm.DB {
// InitPostgresDBInstance return instance of PostgresDB client
func InitPostgresDBInstance(ctx context.Context, PostgresDBURI string) *gorm.DB {
postgresOnce.Do(func() {
_globalPostgresClient = initPostgresDBClient(ctx, PostgresDBURI)
})

View File

@ -15,7 +15,7 @@ import (
// CircuitDiagramLoadHandler define circuit diagram load process API
func CircuitDiagramLoadHandler(c *gin.Context) {
logger := log.LoggerInstance()
logger := log.GetLoggerInstance()
pageID, err := strconv.ParseInt(c.Query("page_id"), 10, 64)
if err != nil {
logger.Error("get pageID from url param failed", zap.Error(err))

View File

@ -67,16 +67,16 @@ func initLogger(lCfg config.LoggerConfig) *zap.Logger {
return _globalLogger
}
// GetLoggerInstance return instance of zap logger
func GetLoggerInstance(lCfg config.LoggerConfig) *zap.Logger {
// InitLoggerInstance return instance of zap logger
func InitLoggerInstance(lCfg config.LoggerConfig) *zap.Logger {
once.Do(func() {
_globalLogger = initLogger(lCfg)
})
return _globalLogger
}
// LoggerInstance returns the global logger instance It's safe for concurrent use.
func LoggerInstance() *zap.Logger {
// GetLoggerInstance returns the global logger instance It's safe for concurrent use.
func GetLoggerInstance() *zap.Logger {
_globalLoggerMu.RLock()
logger := _globalLogger
_globalLoggerMu.RUnlock()

View File

@ -44,7 +44,7 @@ func main() {
modelRTConfig = config.ReadAndInitConfig(*modelRTConfigDir, *modelRTConfigName, *modelRTConfigType)
// init postgresDBClient
postgresDBClient = database.GetPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI)
postgresDBClient = database.InitPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI)
defer func() {
sqlDB, err := postgresDBClient.DB()
@ -55,7 +55,7 @@ func main() {
}()
// init logger
logger = log.GetLoggerInstance(modelRTConfig.LoggerConfig)
logger = log.InitLoggerInstance(modelRTConfig.LoggerConfig)
defer logger.Sync()
// init ants pool

View File

@ -23,7 +23,7 @@ var ParseFunc = func(parseConfig interface{}) {
cancelCtx, cancel := context.WithTimeout(modelParseConfig.Context, 5*time.Second)
defer cancel()
pgClient := database.PostgresDBClient()
pgClient := database.GetPostgresDBClient()
componentKey := modelParseConfig.ComponentInfo.GlobalUUID.String()
unmarshalMap, err := diagram.GetComponentMap(componentKey)
if err != nil {

View File

@ -3,73 +3,64 @@ package readltimedata
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"modelRT/log"
"github.com/confluentinc/confluent-kafka-go/kafka"
"go.uber.org/zap"
)
// RealTimeDataComputer continuously processing real-time data from Kafka specified topics
func RealTimeDataComputer(consumerConfig kafka.ConfigMap) {
ctx := context.TODO()
// Create a new consumer
consumer, err := kafka.NewConsumer(&consumerConfig)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
// Subscribe to the topic
topics := []string{"my-topic"}
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
log.Fatalf("Failed to subscribe to topics: %v", err)
}
// Setup a channel to listen for interrupt signals
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
// Context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, topics []string, duration string) {
// context for graceful shutdown
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Start a goroutine to handle shutdown
// get a logger
logger := log.GetLoggerInstance()
// setup a channel to listen for interrupt signals
interrupt := make(chan struct{}, 1)
// read message (-1 means wait indefinitely)
timeoutDuration, err := time.ParseDuration(duration)
// create a new consumer
consumer, err := kafka.NewConsumer(&consumerConfig)
if err != nil {
logger.Error("init kafka consume by config failed", zap.Any("config", consumerConfig), zap.Error(err))
}
// subscribe to the topic
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
logger.Error("subscribe to the topic failed", zap.Strings("topic", topics), zap.Error(err))
}
// start a goroutine to handle shutdown
go func() {
<-interrupt
log.Println("Interrupt signal received, stopping consumer...")
cancel()
consumer.Close()
}()
// Continuously read messages from Kafka
// continuously read messages from Kafka
for {
// Read message (-1 means wait indefinitely)
duration := time.Duration(1 * time.Second)
msg, err := consumer.ReadMessage(duration)
msg, err := consumer.ReadMessage(timeoutDuration)
if err != nil {
// Handle errors (e.g., context canceled on interrupt)
if ctx.Err() == context.Canceled {
log.Println("Context canceled, stopping read loop")
logger.Info("context canceled, stopping read loop")
break
}
log.Printf("Consumer error: %v (%v)\n", err, msg)
logger.Error("consumer read message failed", zap.Error(err))
continue
}
// Print message to stdout
// fmt.Printf("Received message: %s from %s [%d] at %v\n",
// msg.Value(), msg.TopicPartition(), msg.Partition(), msg.Timestamp())
// // Commit the offset manually
// err = consumer.CommitMessage(msg)
// if err != nil {
// log.Printf("Failed to commit message: %v", err)
// }
// TODO 使用 ants.pool处理 kafka 的订阅数据
_, err = consumer.CommitMessage(msg)
if err != nil {
logger.Error("manual submission information failed", zap.Any("message", msg), zap.Error(err))
}
}
log.Println("Consumer stopped")
}