modelRT/real-time-data/real_time_data_computing.go

207 lines
6.0 KiB
Go
Raw Normal View History

// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"encoding/json"
"fmt"
"time"
"modelRT/config"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/network"
"modelRT/pool"
"modelRT/util"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// RealTimeDataChan define channel of real time data receive
var RealTimeDataChan chan network.RealTimeDataReceiveRequest
func init() {
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
}
// ReceiveChan define func of real time data receive and process
func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) {
consumer, err := kafka.NewConsumer(consumerConfig)
if err != nil {
logger.Error(ctx, "create kafka consumer failed", "error", err)
return
}
defer consumer.Close()
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
logger.Error(ctx, "subscribe kafka topics failed", "topic", topics, "error", err)
return
}
batchSize := 100
batchTimeout := util.SecondsToDuration(duration)
messages := make([]*kafka.Message, 0, batchSize)
lastCommit := time.Now()
logger.Info(ctx, "start consuming from kafka", "topic", topics)
for {
select {
case <-ctx.Done():
return
case realTimeData := <-RealTimeDataChan:
componentUUID := realTimeData.PayLoad.ComponentUUID
component, err := diagram.GetComponentMap(componentUUID)
if err != nil {
logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err)
continue
}
componentType := component.Type
if componentType != constants.DemoType {
logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID)
continue
}
var anchorName string
var compareValUpperLimit, compareValLowerLimit float64
var anchorRealTimeData []float64
var calculateFunc func(archorValue float64, args ...float64) float64
// calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData)
for _, param := range realTimeData.PayLoad.Values {
anchorRealTimeData = append(anchorRealTimeData, param.Value)
}
anchorConfig := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
ComponentUUID: componentUUID,
AnchorName: anchorName,
CompareValUpperLimit: compareValUpperLimit,
CompareValLowerLimit: compareValLowerLimit,
AnchorRealTimeData: anchorRealTimeData,
},
CalculateFunc: calculateFunc,
CalculateParams: []float64{},
}
anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
if err != nil {
logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err)
continue
}
anchorChan <- anchorConfig
default:
msg, err := consumer.ReadMessage(batchTimeout)
if err != nil {
if err.(kafka.Error).Code() == kafka.ErrTimedOut {
// 超时时处理累积的消息
if len(messages) > 0 {
processMessageBatch(ctx, messages)
consumer.Commit()
messages = messages[:0]
}
continue
}
logger.Error(ctx, "read message from kafka failed", "error", err)
continue
}
messages = append(messages, msg)
// TODO 达到批处理大小或超时时间时处理消息
if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout {
processMessageBatch(ctx, messages)
consumer.Commit()
messages = messages[:0]
lastCommit = time.Now()
}
}
}
}
type RealTimeDataPayload struct {
ComponentUUID string
Values []float64
}
type RealTimeData struct {
Payload RealTimeDataPayload
}
func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
var realTimeData RealTimeData
err := json.Unmarshal(msgValue, &realTimeData)
if err != nil {
return nil, fmt.Errorf("json unmarshal failed: %w", err)
}
return &realTimeData, nil
}
func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
componentUUID := realTimeData.Payload.ComponentUUID
component, err := diagram.GetComponentMap(componentUUID)
if err != nil {
logger.Error(ctx, "query component info from diagram map by component id failed",
"component_uuid", componentUUID, "error", err)
return
}
componentType := component.Type
if componentType != constants.DemoType {
logger.Error(ctx, "can not process real time data of component type not equal DemoType",
"component_uuid", componentUUID)
return
}
var anchorName string
var compareValUpperLimit, compareValLowerLimit float64
var anchorRealTimeData []float64
var calculateFunc func(archorValue float64, args ...float64) float64
// 收集实时数据
for _, param := range realTimeData.Payload.Values {
anchorRealTimeData = append(anchorRealTimeData, param)
}
anchorConfig := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
ComponentUUID: componentUUID,
AnchorName: anchorName,
CompareValUpperLimit: compareValUpperLimit,
CompareValLowerLimit: compareValLowerLimit,
AnchorRealTimeData: anchorRealTimeData,
},
CalculateFunc: calculateFunc,
CalculateParams: []float64{},
}
anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
if err != nil {
logger.Error(ctx, "get anchor param chan failed",
"component_uuid", componentUUID, "error", err)
return
}
// TODO 使用select避免channel阻塞
select {
case anchorChan <- anchorConfig:
// 成功发送
case <-ctx.Done():
logger.Info(ctx, "context done while sending to anchor chan")
case <-time.After(5 * time.Second):
logger.Error(ctx, "timeout sending to anchor chan", "component_uuid", componentUUID)
}
}
func processMessageBatch(ctx context.Context, messages []*kafka.Message) {
for _, msg := range messages {
realTimeData, err := parseKafkaMessage(msg.Value)
if err != nil {
logger.Error(ctx, "parse kafka message failed", "error", err)
continue
}
processRealTimeData(ctx, realTimeData)
}
}