2025-11-12 17:34:18 +08:00
|
|
|
|
// Package realtimedata define real time data operation functions
|
|
|
|
|
|
package realtimedata
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"encoding/json"
|
2025-11-14 16:34:34 +08:00
|
|
|
|
"errors"
|
2025-11-12 17:34:18 +08:00
|
|
|
|
"fmt"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"modelRT/config"
|
|
|
|
|
|
"modelRT/constants"
|
|
|
|
|
|
"modelRT/diagram"
|
|
|
|
|
|
"modelRT/logger"
|
2025-11-14 16:34:34 +08:00
|
|
|
|
"modelRT/model"
|
2025-11-12 17:34:18 +08:00
|
|
|
|
"modelRT/network"
|
2025-11-13 17:29:49 +08:00
|
|
|
|
"modelRT/orm"
|
2025-11-12 17:34:18 +08:00
|
|
|
|
"modelRT/pool"
|
2025-11-13 11:48:26 +08:00
|
|
|
|
"modelRT/util"
|
2025-11-12 17:34:18 +08:00
|
|
|
|
|
|
|
|
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2025-11-13 17:29:49 +08:00
|
|
|
|
var (
|
|
|
|
|
|
// RealTimeDataChan define channel of real time data receive
|
|
|
|
|
|
RealTimeDataChan chan network.RealTimeDataReceiveRequest
|
|
|
|
|
|
globalComputeState *MeasComputeState
|
|
|
|
|
|
)
|
2025-11-12 17:34:18 +08:00
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
|
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
|
2025-11-13 17:29:49 +08:00
|
|
|
|
globalComputeState = NewMeasComputeState()
|
2025-11-12 17:34:18 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-13 17:29:49 +08:00
|
|
|
|
// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info
|
|
|
|
|
|
func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) {
|
|
|
|
|
|
for _, measurement := range measurements {
|
|
|
|
|
|
enableValue, exist := measurement.EventPlan["enable"]
|
|
|
|
|
|
enable, ok := enableValue.(bool)
|
|
|
|
|
|
if !exist || !enable {
|
|
|
|
|
|
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-14 16:34:34 +08:00
|
|
|
|
conf, err := initComputeConfig(measurement)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if conf == nil {
|
|
|
|
|
|
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
uuidStr := measurement.ComponentUUID.String()
|
|
|
|
|
|
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
|
|
|
|
|
|
conf.StopGchan = make(chan struct{})
|
|
|
|
|
|
globalComputeState.Store(uuidStr, conf)
|
|
|
|
|
|
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID)
|
|
|
|
|
|
go continuousComputation(enrichedCtx, conf)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
2025-11-17 16:39:26 +08:00
|
|
|
|
var err error
|
|
|
|
|
|
|
2025-11-14 16:34:34 +08:00
|
|
|
|
enableValue, exist := measurement.EventPlan["enable"]
|
|
|
|
|
|
enable, ok := enableValue.(bool)
|
|
|
|
|
|
if !exist {
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if !enable {
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
conf := &ComputeConfig{}
|
|
|
|
|
|
|
|
|
|
|
|
causeValue, exist := measurement.EventPlan["cause"]
|
|
|
|
|
|
if !exist {
|
|
|
|
|
|
return nil, errors.New("missing required field cause")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
cause, ok := causeValue.(map[string]any)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
|
|
|
|
|
|
}
|
2025-11-17 16:39:26 +08:00
|
|
|
|
conf.Cause, err = processCauseMap(cause)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
|
|
|
|
|
|
}
|
2025-11-14 16:34:34 +08:00
|
|
|
|
|
|
|
|
|
|
actionValue, exist := measurement.EventPlan["action"]
|
|
|
|
|
|
if !exist {
|
|
|
|
|
|
return nil, errors.New("missing required field action")
|
|
|
|
|
|
}
|
|
|
|
|
|
action, ok := actionValue.(map[string]any)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
|
|
|
|
|
|
}
|
|
|
|
|
|
conf.Action = action
|
|
|
|
|
|
|
|
|
|
|
|
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
conf.QueryKey = queryKey
|
|
|
|
|
|
conf.DataSize = int64(measurement.Size)
|
2025-11-17 16:39:26 +08:00
|
|
|
|
// TODO use constant values for temporary settings
|
|
|
|
|
|
conf.minBreachCount = constants.MinBreachCount
|
|
|
|
|
|
|
|
|
|
|
|
isFloatCause := false
|
|
|
|
|
|
if _, exists := conf.Cause["up"]; exists {
|
|
|
|
|
|
isFloatCause = true
|
|
|
|
|
|
} else if _, exists := conf.Cause["down"]; exists {
|
|
|
|
|
|
isFloatCause = true
|
|
|
|
|
|
} else if _, exists := conf.Cause["upup"]; exists {
|
|
|
|
|
|
isFloatCause = true
|
|
|
|
|
|
} else if _, exists := conf.Cause["downdown"]; exists {
|
|
|
|
|
|
isFloatCause = true
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if isFloatCause {
|
|
|
|
|
|
// te config
|
|
|
|
|
|
teThresholds, err := parseTEThresholds(conf.Cause)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// ti config
|
|
|
|
|
|
tiThresholds, err := parseTIThresholds(conf.Cause)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
|
|
|
|
|
|
}
|
2025-11-14 16:34:34 +08:00
|
|
|
|
|
|
|
|
|
|
return conf, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-17 16:39:26 +08:00
|
|
|
|
func processCauseMap(data map[string]any) (map[string]any, error) {
|
|
|
|
|
|
causeResult := make(map[string]any)
|
|
|
|
|
|
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
|
|
|
|
|
|
|
|
|
|
|
var foundFloatKey bool
|
|
|
|
|
|
for _, key := range keysToExtract {
|
|
|
|
|
|
if value, exists := data[key]; exists {
|
|
|
|
|
|
|
|
|
|
|
|
foundFloatKey = true
|
|
|
|
|
|
|
|
|
|
|
|
// check value type
|
|
|
|
|
|
if floatVal, ok := value.(float64); ok {
|
|
|
|
|
|
causeResult[key] = floatVal
|
|
|
|
|
|
} else {
|
|
|
|
|
|
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if foundFloatKey == true {
|
|
|
|
|
|
return causeResult, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
edgeKey := "edge"
|
|
|
|
|
|
if value, exists := data[edgeKey]; exists {
|
|
|
|
|
|
if stringVal, ok := value.(string); ok {
|
|
|
|
|
|
switch stringVal {
|
|
|
|
|
|
case "raising":
|
|
|
|
|
|
fallthrough
|
|
|
|
|
|
case "falling":
|
|
|
|
|
|
causeResult[edgeKey] = stringVal
|
|
|
|
|
|
default:
|
|
|
|
|
|
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-14 16:34:34 +08:00
|
|
|
|
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
|
|
|
|
|
client := diagram.NewRedisClient()
|
|
|
|
|
|
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
|
2025-11-17 16:39:26 +08:00
|
|
|
|
duration := util.SecondsToDuration(conf.Duration)
|
2025-11-14 16:34:34 +08:00
|
|
|
|
ticker := time.NewTicker(duration)
|
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
startTimestamp := util.GenNanoTsStr()
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-conf.StopGchan:
|
|
|
|
|
|
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
|
stopTimestamp := util.GenNanoTsStr()
|
|
|
|
|
|
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize, startTimestamp, stopTimestamp)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
startTimestamp = stopTimestamp
|
|
|
|
|
|
|
2025-11-17 16:39:26 +08:00
|
|
|
|
realTimedatas := util.ConvertZSetMembersToFloat64(ctx, members)
|
|
|
|
|
|
if conf.Analyzer != nil {
|
|
|
|
|
|
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
2025-11-14 16:34:34 +08:00
|
|
|
|
} else {
|
2025-11-17 16:39:26 +08:00
|
|
|
|
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
|
2025-11-14 16:34:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-11-13 17:29:49 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ReceiveChan define func to real time data receive and process
|
2025-11-13 11:48:26 +08:00
|
|
|
|
func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) {
|
2025-11-12 17:34:18 +08:00
|
|
|
|
consumer, err := kafka.NewConsumer(consumerConfig)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Error(ctx, "create kafka consumer failed", "error", err)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
defer consumer.Close()
|
|
|
|
|
|
|
|
|
|
|
|
err = consumer.SubscribeTopics(topics, nil)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Error(ctx, "subscribe kafka topics failed", "topic", topics, "error", err)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
batchSize := 100
|
2025-11-13 11:48:26 +08:00
|
|
|
|
batchTimeout := util.SecondsToDuration(duration)
|
2025-11-12 17:34:18 +08:00
|
|
|
|
messages := make([]*kafka.Message, 0, batchSize)
|
|
|
|
|
|
lastCommit := time.Now()
|
2025-11-13 11:48:26 +08:00
|
|
|
|
logger.Info(ctx, "start consuming from kafka", "topic", topics)
|
2025-11-12 17:34:18 +08:00
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
2025-11-13 17:29:49 +08:00
|
|
|
|
logger.Info(ctx, "stop real time data computing by context cancel")
|
2025-11-12 17:34:18 +08:00
|
|
|
|
return
|
|
|
|
|
|
case realTimeData := <-RealTimeDataChan:
|
|
|
|
|
|
componentUUID := realTimeData.PayLoad.ComponentUUID
|
|
|
|
|
|
component, err := diagram.GetComponentMap(componentUUID)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
componentType := component.Type
|
|
|
|
|
|
if componentType != constants.DemoType {
|
|
|
|
|
|
logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var anchorName string
|
|
|
|
|
|
var compareValUpperLimit, compareValLowerLimit float64
|
|
|
|
|
|
var anchorRealTimeData []float64
|
|
|
|
|
|
var calculateFunc func(archorValue float64, args ...float64) float64
|
|
|
|
|
|
|
|
|
|
|
|
// calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData)
|
|
|
|
|
|
|
|
|
|
|
|
for _, param := range realTimeData.PayLoad.Values {
|
|
|
|
|
|
anchorRealTimeData = append(anchorRealTimeData, param.Value)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
anchorConfig := config.AnchorParamConfig{
|
|
|
|
|
|
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
|
|
|
|
|
|
ComponentUUID: componentUUID,
|
|
|
|
|
|
AnchorName: anchorName,
|
|
|
|
|
|
CompareValUpperLimit: compareValUpperLimit,
|
|
|
|
|
|
CompareValLowerLimit: compareValLowerLimit,
|
|
|
|
|
|
AnchorRealTimeData: anchorRealTimeData,
|
|
|
|
|
|
},
|
|
|
|
|
|
CalculateFunc: calculateFunc,
|
|
|
|
|
|
CalculateParams: []float64{},
|
|
|
|
|
|
}
|
|
|
|
|
|
anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
anchorChan <- anchorConfig
|
|
|
|
|
|
default:
|
|
|
|
|
|
msg, err := consumer.ReadMessage(batchTimeout)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
if err.(kafka.Error).Code() == kafka.ErrTimedOut {
|
2025-11-13 17:29:49 +08:00
|
|
|
|
// process accumulated messages when timeout
|
2025-11-12 17:34:18 +08:00
|
|
|
|
if len(messages) > 0 {
|
|
|
|
|
|
processMessageBatch(ctx, messages)
|
|
|
|
|
|
consumer.Commit()
|
|
|
|
|
|
messages = messages[:0]
|
|
|
|
|
|
}
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
2025-11-13 17:29:49 +08:00
|
|
|
|
logger.Error(ctx, "read message from kafka failed", "error", err, "msg", msg)
|
2025-11-12 17:34:18 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
messages = append(messages, msg)
|
2025-11-13 17:29:49 +08:00
|
|
|
|
// process messages when batch size or timeout period is reached
|
2025-11-12 17:34:18 +08:00
|
|
|
|
if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout {
|
|
|
|
|
|
processMessageBatch(ctx, messages)
|
|
|
|
|
|
consumer.Commit()
|
|
|
|
|
|
messages = messages[:0]
|
|
|
|
|
|
lastCommit = time.Now()
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-17 16:39:26 +08:00
|
|
|
|
type realTimeDataPayload struct {
|
2025-11-12 17:34:18 +08:00
|
|
|
|
ComponentUUID string
|
|
|
|
|
|
Values []float64
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-17 16:39:26 +08:00
|
|
|
|
type realTimeData struct {
|
|
|
|
|
|
Payload realTimeDataPayload
|
2025-11-12 17:34:18 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-17 16:39:26 +08:00
|
|
|
|
func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
|
|
|
|
|
|
var realTimeData realTimeData
|
2025-11-12 17:34:18 +08:00
|
|
|
|
err := json.Unmarshal(msgValue, &realTimeData)
|
|
|
|
|
|
if err != nil {
|
2025-11-13 17:29:49 +08:00
|
|
|
|
return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
2025-11-12 17:34:18 +08:00
|
|
|
|
}
|
|
|
|
|
|
return &realTimeData, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-17 16:39:26 +08:00
|
|
|
|
func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
|
2025-11-12 17:34:18 +08:00
|
|
|
|
componentUUID := realTimeData.Payload.ComponentUUID
|
|
|
|
|
|
component, err := diagram.GetComponentMap(componentUUID)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Error(ctx, "query component info from diagram map by component id failed",
|
|
|
|
|
|
"component_uuid", componentUUID, "error", err)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
componentType := component.Type
|
|
|
|
|
|
if componentType != constants.DemoType {
|
|
|
|
|
|
logger.Error(ctx, "can not process real time data of component type not equal DemoType",
|
|
|
|
|
|
"component_uuid", componentUUID)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var anchorName string
|
|
|
|
|
|
var compareValUpperLimit, compareValLowerLimit float64
|
|
|
|
|
|
var anchorRealTimeData []float64
|
|
|
|
|
|
var calculateFunc func(archorValue float64, args ...float64) float64
|
|
|
|
|
|
|
|
|
|
|
|
for _, param := range realTimeData.Payload.Values {
|
|
|
|
|
|
anchorRealTimeData = append(anchorRealTimeData, param)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
anchorConfig := config.AnchorParamConfig{
|
|
|
|
|
|
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
|
|
|
|
|
|
ComponentUUID: componentUUID,
|
|
|
|
|
|
AnchorName: anchorName,
|
|
|
|
|
|
CompareValUpperLimit: compareValUpperLimit,
|
|
|
|
|
|
CompareValLowerLimit: compareValLowerLimit,
|
|
|
|
|
|
AnchorRealTimeData: anchorRealTimeData,
|
|
|
|
|
|
},
|
|
|
|
|
|
CalculateFunc: calculateFunc,
|
|
|
|
|
|
CalculateParams: []float64{},
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Error(ctx, "get anchor param chan failed",
|
|
|
|
|
|
"component_uuid", componentUUID, "error", err)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
case anchorChan <- anchorConfig:
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
logger.Info(ctx, "context done while sending to anchor chan")
|
|
|
|
|
|
case <-time.After(5 * time.Second):
|
|
|
|
|
|
logger.Error(ctx, "timeout sending to anchor chan", "component_uuid", componentUUID)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-13 17:29:49 +08:00
|
|
|
|
// processMessageBatch define func to bathc process kafka message
|
2025-11-12 17:34:18 +08:00
|
|
|
|
func processMessageBatch(ctx context.Context, messages []*kafka.Message) {
|
|
|
|
|
|
for _, msg := range messages {
|
|
|
|
|
|
realTimeData, err := parseKafkaMessage(msg.Value)
|
|
|
|
|
|
if err != nil {
|
2025-11-13 17:29:49 +08:00
|
|
|
|
logger.Error(ctx, "parse kafka message failed", "error", err, "msg", msg)
|
2025-11-12 17:34:18 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2025-11-13 17:29:49 +08:00
|
|
|
|
go processRealTimeData(ctx, realTimeData)
|
2025-11-12 17:34:18 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|