diff --git a/config/config.go b/config/config.go index b9968ec..23ec548 100644 --- a/config/config.go +++ b/config/config.go @@ -16,11 +16,12 @@ type BaseConfig struct { // KafkaConfig define config stuct of kafka config type KafkaConfig struct { - Servers string `mapstructure:"Servers"` - GroupID string `mapstructure:"group_id"` - Topic string `mapstructure:"topic"` - AutoOffsetReset string `mapstructure:"auto_offset_reset"` - EnableAutoCommit string `mapstructure:"enable_auto_commit"` + Servers string `mapstructure:"Servers"` + GroupID string `mapstructure:"group_id"` + Topic string `mapstructure:"topic"` + AutoOffsetReset string `mapstructure:"auto_offset_reset"` + EnableAutoCommit string `mapstructure:"enable_auto_commit"` + ReadMessageTimeDuration string `mapstructure:"read_message_time_duration"` } // PostgresConfig define config stuct of postgres config diff --git a/config/config.yaml b/config/config.yaml index 4d93171..b5071be 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -12,6 +12,7 @@ kafka: topic: "" auto_offset_reset: "earliest" enable_auto_commit: "false" + read_message_time_duration: ”0.5s" # influxdb: # host: "localhost" diff --git a/database/postgres_init.go b/database/postgres_init.go index e449e2b..d4edcc8 100644 --- a/database/postgres_init.go +++ b/database/postgres_init.go @@ -16,16 +16,16 @@ var ( _globalPostgresMu sync.RWMutex ) -// PostgresDBClient returns the global PostgresDB client.It's safe for concurrent use. -func PostgresDBClient() *gorm.DB { +// GetPostgresDBClient returns the global PostgresDB client.It's safe for concurrent use. +func GetPostgresDBClient() *gorm.DB { _globalPostgresMu.RLock() client := _globalPostgresClient _globalPostgresMu.RUnlock() return client } -// GetPostgresDBInstance return instance of PostgresDB client -func GetPostgresDBInstance(ctx context.Context, PostgresDBURI string) *gorm.DB { +// InitPostgresDBInstance return instance of PostgresDB client +func InitPostgresDBInstance(ctx context.Context, PostgresDBURI string) *gorm.DB { postgresOnce.Do(func() { _globalPostgresClient = initPostgresDBClient(ctx, PostgresDBURI) }) diff --git a/handler/circuit_diagram_load.go b/handler/circuit_diagram_load.go index 71c39b3..eeb6630 100644 --- a/handler/circuit_diagram_load.go +++ b/handler/circuit_diagram_load.go @@ -15,7 +15,7 @@ import ( // CircuitDiagramLoadHandler define circuit diagram load process API func CircuitDiagramLoadHandler(c *gin.Context) { - logger := log.LoggerInstance() + logger := log.GetLoggerInstance() pageID, err := strconv.ParseInt(c.Query("page_id"), 10, 64) if err != nil { logger.Error("get pageID from url param failed", zap.Error(err)) diff --git a/log/init.go b/log/init.go index ebc0f55..c360c2e 100644 --- a/log/init.go +++ b/log/init.go @@ -67,16 +67,16 @@ func initLogger(lCfg config.LoggerConfig) *zap.Logger { return _globalLogger } -// GetLoggerInstance return instance of zap logger -func GetLoggerInstance(lCfg config.LoggerConfig) *zap.Logger { +// InitLoggerInstance return instance of zap logger +func InitLoggerInstance(lCfg config.LoggerConfig) *zap.Logger { once.Do(func() { _globalLogger = initLogger(lCfg) }) return _globalLogger } -// LoggerInstance returns the global logger instance It's safe for concurrent use. -func LoggerInstance() *zap.Logger { +// GetLoggerInstance returns the global logger instance It's safe for concurrent use. +func GetLoggerInstance() *zap.Logger { _globalLoggerMu.RLock() logger := _globalLogger _globalLoggerMu.RUnlock() diff --git a/main.go b/main.go index 7318b02..076c0f0 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ func main() { modelRTConfig = config.ReadAndInitConfig(*modelRTConfigDir, *modelRTConfigName, *modelRTConfigType) // init postgresDBClient - postgresDBClient = database.GetPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI) + postgresDBClient = database.InitPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI) defer func() { sqlDB, err := postgresDBClient.DB() @@ -55,7 +55,7 @@ func main() { }() // init logger - logger = log.GetLoggerInstance(modelRTConfig.LoggerConfig) + logger = log.InitLoggerInstance(modelRTConfig.LoggerConfig) defer logger.Sync() // init ants pool diff --git a/model/model_parse.go b/model/model_parse.go index 0cb9bad..8df2abf 100644 --- a/model/model_parse.go +++ b/model/model_parse.go @@ -23,7 +23,7 @@ var ParseFunc = func(parseConfig interface{}) { cancelCtx, cancel := context.WithTimeout(modelParseConfig.Context, 5*time.Second) defer cancel() - pgClient := database.PostgresDBClient() + pgClient := database.GetPostgresDBClient() componentKey := modelParseConfig.ComponentInfo.GlobalUUID.String() unmarshalMap, err := diagram.GetComponentMap(componentKey) if err != nil { diff --git a/real-time-data/kafka.go b/real-time-data/kafka.go index efeb1a4..1821ac8 100644 --- a/real-time-data/kafka.go +++ b/real-time-data/kafka.go @@ -3,73 +3,64 @@ package readltimedata import ( "context" - "log" - "os" - "os/signal" - "syscall" "time" + "modelRT/log" + "github.com/confluentinc/confluent-kafka-go/kafka" + "go.uber.org/zap" ) // RealTimeDataComputer continuously processing real-time data from Kafka specified topics -func RealTimeDataComputer(consumerConfig kafka.ConfigMap) { - ctx := context.TODO() - - // Create a new consumer - consumer, err := kafka.NewConsumer(&consumerConfig) - if err != nil { - log.Fatalf("Failed to create consumer: %v", err) - } - - // Subscribe to the topic - topics := []string{"my-topic"} - err = consumer.SubscribeTopics(topics, nil) - if err != nil { - log.Fatalf("Failed to subscribe to topics: %v", err) - } - - // Setup a channel to listen for interrupt signals - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) - - // Context for graceful shutdown - ctx, cancel := context.WithCancel(context.Background()) +func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, topics []string, duration string) { + // context for graceful shutdown + ctx, cancel := context.WithCancel(ctx) defer cancel() - // Start a goroutine to handle shutdown + // get a logger + logger := log.GetLoggerInstance() + + // setup a channel to listen for interrupt signals + interrupt := make(chan struct{}, 1) + + // read message (-1 means wait indefinitely) + timeoutDuration, err := time.ParseDuration(duration) + + // create a new consumer + consumer, err := kafka.NewConsumer(&consumerConfig) + if err != nil { + logger.Error("init kafka consume by config failed", zap.Any("config", consumerConfig), zap.Error(err)) + } + + // subscribe to the topic + err = consumer.SubscribeTopics(topics, nil) + if err != nil { + logger.Error("subscribe to the topic failed", zap.Strings("topic", topics), zap.Error(err)) + } + + // start a goroutine to handle shutdown go func() { <-interrupt - log.Println("Interrupt signal received, stopping consumer...") cancel() consumer.Close() }() - // Continuously read messages from Kafka + // continuously read messages from Kafka for { - // Read message (-1 means wait indefinitely) - duration := time.Duration(1 * time.Second) - msg, err := consumer.ReadMessage(duration) + msg, err := consumer.ReadMessage(timeoutDuration) if err != nil { - // Handle errors (e.g., context canceled on interrupt) if ctx.Err() == context.Canceled { - log.Println("Context canceled, stopping read loop") + logger.Info("context canceled, stopping read loop") break } - log.Printf("Consumer error: %v (%v)\n", err, msg) + logger.Error("consumer read message failed", zap.Error(err)) continue } - // Print message to stdout - // fmt.Printf("Received message: %s from %s [%d] at %v\n", - // msg.Value(), msg.TopicPartition(), msg.Partition(), msg.Timestamp()) - - // // Commit the offset manually - // err = consumer.CommitMessage(msg) - // if err != nil { - // log.Printf("Failed to commit message: %v", err) - // } + // TODO 使用 ants.pool处理 kafka 的订阅数据 + _, err = consumer.CommitMessage(msg) + if err != nil { + logger.Error("manual submission information failed", zap.Any("message", msg), zap.Error(err)) + } } - - log.Println("Consumer stopped") }