Compare commits
No commits in common. "dff74222c68d24aef1c109af0dec56992ae29474" and "d434a7737d1d6ef2c31e9798475c16ddf989ac96" have entirely different histories.
dff74222c6
...
d434a7737d
|
|
@ -1,7 +0,0 @@
|
||||||
// Package constants define constant variable
|
|
||||||
package constants
|
|
||||||
|
|
||||||
type contextKey string
|
|
||||||
|
|
||||||
// MeasurementUUIDKey define measurement uuid key into context
|
|
||||||
const MeasurementUUIDKey contextKey = "measurement_uuid"
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
// Package constants define constant variable
|
|
||||||
package constants
|
|
||||||
|
|
||||||
const (
|
|
||||||
// TIBreachTriggerType define out of bounds type constant
|
|
||||||
TIBreachTriggerType = "trigger"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// TelemetryUpLimit define telemetry upper limit
|
|
||||||
TelemetryUpLimit = "up"
|
|
||||||
// TelemetryUpUpLimit define telemetry upper upper limit
|
|
||||||
TelemetryUpUpLimit = "upup"
|
|
||||||
|
|
||||||
// TelemetryDownLimit define telemetry limit
|
|
||||||
TelemetryDownLimit = "down"
|
|
||||||
// TelemetryDownDownLimit define telemetry lower lower limit
|
|
||||||
TelemetryDownDownLimit = "downdown"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// TelesignalRaising define telesignal raising edge
|
|
||||||
TelesignalRaising = "raising"
|
|
||||||
// TelesignalFalling define telesignal falling edge
|
|
||||||
TelesignalFalling = "falling"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// MinBreachCount define min breach count of real time data
|
|
||||||
MinBreachCount = 10
|
|
||||||
)
|
|
||||||
|
|
@ -1,335 +0,0 @@
|
||||||
// Package realtimedata define real time data operation functions
|
|
||||||
package realtimedata
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"modelRT/constants"
|
|
||||||
"modelRT/logger"
|
|
||||||
"modelRT/real-time-data/event"
|
|
||||||
)
|
|
||||||
|
|
||||||
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
|
|
||||||
type RealTimeAnalyzer interface {
|
|
||||||
AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64)
|
|
||||||
}
|
|
||||||
|
|
||||||
// teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause
|
|
||||||
type teEventThresholds struct {
|
|
||||||
up float64
|
|
||||||
upup float64
|
|
||||||
down float64
|
|
||||||
downdown float64
|
|
||||||
isFloatCause bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseTEThresholds define func to parse telemetry thresholds by casue map
|
|
||||||
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
|
|
||||||
t := teEventThresholds{}
|
|
||||||
floatKeys := map[string]*float64{
|
|
||||||
"upup": &t.upup,
|
|
||||||
"up": &t.up,
|
|
||||||
"down": &t.down,
|
|
||||||
"downdown": &t.downdown,
|
|
||||||
}
|
|
||||||
|
|
||||||
for key, ptr := range floatKeys {
|
|
||||||
if value, exists := cause[key]; exists {
|
|
||||||
if floatVal, ok := value.(float64); ok {
|
|
||||||
*ptr = floatVal
|
|
||||||
t.isFloatCause = true
|
|
||||||
} else {
|
|
||||||
return teEventThresholds{}, fmt.Errorf("key:%s type is incorrect. expected float64, actual %T", key, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// quickly check mutual exclusion
|
|
||||||
if _, exists := cause["edge"]; exists && t.isFloatCause {
|
|
||||||
return teEventThresholds{}, errors.New("cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found")
|
|
||||||
}
|
|
||||||
return t, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to
|
|
||||||
func getTEBreachType(value float64, t teEventThresholds) string {
|
|
||||||
if t.upup > 0 && value > t.upup {
|
|
||||||
return constants.TelemetryUpUpLimit
|
|
||||||
}
|
|
||||||
if t.up > 0 && value > t.up {
|
|
||||||
return constants.TelemetryUpLimit
|
|
||||||
}
|
|
||||||
if t.downdown > 0 && value < t.downdown {
|
|
||||||
return constants.TelemetryDownDownLimit
|
|
||||||
}
|
|
||||||
if t.down > 0 && value < t.down {
|
|
||||||
return constants.TelemetryDownLimit
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic.
|
|
||||||
type TEAnalyzer struct {
|
|
||||||
Thresholds teEventThresholds
|
|
||||||
}
|
|
||||||
|
|
||||||
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
|
|
||||||
func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
|
|
||||||
analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues)
|
|
||||||
}
|
|
||||||
|
|
||||||
// analyzeTEDataLogic define func to processing telemetry data and event triggering
|
|
||||||
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
|
|
||||||
windowSize := conf.minBreachCount
|
|
||||||
if windowSize <= 0 {
|
|
||||||
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// mark whether any events have been triggered in this batch
|
|
||||||
var eventTriggered bool
|
|
||||||
breachTriggers := map[string]bool{
|
|
||||||
"up": false, "upup": false, "down": false, "downdown": false,
|
|
||||||
}
|
|
||||||
|
|
||||||
// implement slide window to determine breach counts
|
|
||||||
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
|
|
||||||
window := realTimeValues[i : i+windowSize]
|
|
||||||
firstValueBreachType := getTEBreachType(window[0], thresholds)
|
|
||||||
|
|
||||||
if firstValueBreachType == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
allMatch := true
|
|
||||||
for j := 1; j < windowSize; j++ {
|
|
||||||
currentValueBreachType := getTEBreachType(window[j], thresholds)
|
|
||||||
if currentValueBreachType != firstValueBreachType {
|
|
||||||
allMatch = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if allMatch {
|
|
||||||
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
|
|
||||||
if !breachTriggers[firstValueBreachType] {
|
|
||||||
// trigger event
|
|
||||||
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
|
|
||||||
|
|
||||||
breachTriggers[firstValueBreachType] = true
|
|
||||||
eventTriggered = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if eventTriggered {
|
|
||||||
command, content := genTEEventCommandAndContent(conf.Action)
|
|
||||||
// TODO 考虑 content 是否可以为空,先期不允许
|
|
||||||
if command == "" || content == "" {
|
|
||||||
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
event.TriggerEventAction(ctx, command, content)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.")
|
|
||||||
}
|
|
||||||
|
|
||||||
func genTEEventCommandAndContent(action map[string]any) (command string, content string) {
|
|
||||||
cmdValue, exist := action["command"]
|
|
||||||
if !exist {
|
|
||||||
return "", ""
|
|
||||||
}
|
|
||||||
|
|
||||||
commandStr, ok := cmdValue.(string)
|
|
||||||
if !ok {
|
|
||||||
return "", ""
|
|
||||||
}
|
|
||||||
command = commandStr
|
|
||||||
|
|
||||||
paramsValue, exist := action["parametes"]
|
|
||||||
if !exist {
|
|
||||||
return command, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
parameterSlice, ok := paramsValue.([]string)
|
|
||||||
if !ok {
|
|
||||||
return command, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
var builder strings.Builder
|
|
||||||
for i, parameter := range parameterSlice {
|
|
||||||
if i > 0 {
|
|
||||||
builder.WriteString(",")
|
|
||||||
}
|
|
||||||
builder.WriteString(parameter)
|
|
||||||
}
|
|
||||||
|
|
||||||
return command, builder.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
// tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause
|
|
||||||
type tiEventThresholds struct {
|
|
||||||
edge string
|
|
||||||
isFloatCause bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseTEThresholds define func to parse telesignal thresholds by casue map
|
|
||||||
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
|
||||||
edgeKey := "edge"
|
|
||||||
t := tiEventThresholds{
|
|
||||||
isFloatCause: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
if value, exists := cause[edgeKey]; exists {
|
|
||||||
if strVal, ok := value.(string); ok {
|
|
||||||
switch strVal {
|
|
||||||
case "raising", "falling":
|
|
||||||
t.edge = strVal
|
|
||||||
return t, nil
|
|
||||||
default:
|
|
||||||
return tiEventThresholds{}, fmt.Errorf("key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'", edgeKey, strVal)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return tiEventThresholds{}, fmt.Errorf("key:%s already exists but type is incorrect. expected string, actual %T", edgeKey, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return tiEventThresholds{}, fmt.Errorf("cause map is invalid for telesignal: missing required key '%s'", edgeKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
|
|
||||||
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
|
|
||||||
if t.edge == constants.TelesignalRaising {
|
|
||||||
if previousValue == 0.0 && currentValue == 1.0 {
|
|
||||||
return constants.TIBreachTriggerType
|
|
||||||
}
|
|
||||||
} else if t.edge == constants.TelesignalFalling {
|
|
||||||
if previousValue == 1.0 && currentValue == 0.0 {
|
|
||||||
return constants.TIBreachTriggerType
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic
|
|
||||||
type TIAnalyzer struct {
|
|
||||||
Thresholds tiEventThresholds
|
|
||||||
}
|
|
||||||
|
|
||||||
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
|
|
||||||
func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
|
|
||||||
analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues)
|
|
||||||
}
|
|
||||||
|
|
||||||
// analyzeTIDataLogic define func to processing telesignal data and event triggering
|
|
||||||
func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) {
|
|
||||||
windowSize := conf.minBreachCount
|
|
||||||
if windowSize <= 0 {
|
|
||||||
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
numDataPoints := len(realTimeValues)
|
|
||||||
if numDataPoints < 2 {
|
|
||||||
logger.Info(ctx, "data points less than 2, no change event possible, analysis skipped", "data_points", numDataPoints)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// pre calculate the change event type for all adjacent point pairs
|
|
||||||
numChanges := numDataPoints - 1
|
|
||||||
changeBreachTypes := make([]string, numChanges)
|
|
||||||
|
|
||||||
for i := range numChanges {
|
|
||||||
previousValue := realTimeValues[i]
|
|
||||||
currentValue := realTimeValues[i+1]
|
|
||||||
|
|
||||||
changeBreachTypes[i] = getTIBreachType(currentValue, previousValue, thresholds)
|
|
||||||
}
|
|
||||||
|
|
||||||
if numChanges < windowSize {
|
|
||||||
logger.Error(ctx, "number of change events is less than window size, analysis skipped", "num_changes", numChanges, "window_size", windowSize)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var eventTriggered bool
|
|
||||||
breachTriggers := map[string]bool{
|
|
||||||
constants.TIBreachTriggerType: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i <= numChanges-windowSize; i++ {
|
|
||||||
windowBreachTypes := changeBreachTypes[i : i+windowSize]
|
|
||||||
firstBreachType := windowBreachTypes[0]
|
|
||||||
|
|
||||||
if firstBreachType == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
allMatch := true
|
|
||||||
for j := 1; j < windowSize; j++ {
|
|
||||||
if windowBreachTypes[j] != firstBreachType {
|
|
||||||
allMatch = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if allMatch {
|
|
||||||
if !breachTriggers[firstBreachType] {
|
|
||||||
finalValueIndex := i + windowSize
|
|
||||||
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstBreachType, "value", realTimeValues[finalValueIndex])
|
|
||||||
|
|
||||||
breachTriggers[firstBreachType] = true
|
|
||||||
eventTriggered = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if eventTriggered {
|
|
||||||
command, content := genTIEventCommandAndContent(conf.Action)
|
|
||||||
// TODO 考虑 content 是否可以为空,先期不允许
|
|
||||||
if command == "" || content == "" {
|
|
||||||
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
event.TriggerEventAction(ctx, command, content)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.")
|
|
||||||
}
|
|
||||||
|
|
||||||
func genTIEventCommandAndContent(action map[string]any) (command string, content string) {
|
|
||||||
cmdValue, exist := action["command"]
|
|
||||||
if !exist {
|
|
||||||
return "", ""
|
|
||||||
}
|
|
||||||
|
|
||||||
commandStr, ok := cmdValue.(string)
|
|
||||||
if !ok {
|
|
||||||
return "", ""
|
|
||||||
}
|
|
||||||
command = commandStr
|
|
||||||
|
|
||||||
paramsValue, exist := action["parametes"]
|
|
||||||
if !exist {
|
|
||||||
return command, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
parameterSlice, ok := paramsValue.([]string)
|
|
||||||
if !ok {
|
|
||||||
return command, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
var builder strings.Builder
|
|
||||||
for i, parameter := range parameterSlice {
|
|
||||||
if i > 0 {
|
|
||||||
builder.WriteString(",")
|
|
||||||
}
|
|
||||||
builder.WriteString(parameter)
|
|
||||||
}
|
|
||||||
|
|
||||||
return command, builder.String()
|
|
||||||
}
|
|
||||||
|
|
@ -1,22 +1,13 @@
|
||||||
// Package realtimedata define real time data operation functions
|
// Package realtimedata define real time data operation functions
|
||||||
package realtimedata
|
package realtimedata
|
||||||
|
|
||||||
import (
|
import "sync"
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ComputeConfig define struct of measurement computation
|
// ComputeConfig define struct of measurement computation
|
||||||
type ComputeConfig struct {
|
type ComputeConfig struct {
|
||||||
Cause map[string]any
|
Cause map[string]any
|
||||||
Action map[string]any
|
Action map[string]any
|
||||||
// TODO 预留自由调整的入口
|
StopGchan chan struct{}
|
||||||
// min consecutive breach count
|
|
||||||
minBreachCount int
|
|
||||||
Duration int
|
|
||||||
DataSize int64
|
|
||||||
QueryKey string
|
|
||||||
StopGchan chan struct{}
|
|
||||||
Analyzer RealTimeAnalyzer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
|
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
|
||||||
|
|
|
||||||
|
|
@ -1,74 +0,0 @@
|
||||||
// Package event define real time data evnet operation functions
|
|
||||||
package event
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"modelRT/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
type actionHandler func(ctx context.Context, content string) error
|
|
||||||
|
|
||||||
// actionDispatchMap define variable to store all action handler into map
|
|
||||||
var actionDispatchMap = map[string]actionHandler{
|
|
||||||
"info": handleInfoAction,
|
|
||||||
"warning": handleWarningAction,
|
|
||||||
"error": handleErrorAction,
|
|
||||||
"critical": handleCriticalAction,
|
|
||||||
"exception": handleExceptionAction,
|
|
||||||
}
|
|
||||||
|
|
||||||
// TriggerEventAction define func to trigger event by action in compute config
|
|
||||||
func TriggerEventAction(ctx context.Context, command string, content string) {
|
|
||||||
handler, exists := actionDispatchMap[command]
|
|
||||||
if !exists {
|
|
||||||
logger.Error(ctx, "unknown action command", "command", command)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err := handler(ctx, content)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Info(ctx, "action handler success", "command", command, "content", content)
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleInfoAction(ctx context.Context, content string) error {
|
|
||||||
// 实际执行发送警告、记录日志等操作
|
|
||||||
actionParams := content
|
|
||||||
// ... logic to send info level event using actionParams ...
|
|
||||||
logger.Warn(ctx, "trigger info event", "message", actionParams)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleWarningAction(ctx context.Context, content string) error {
|
|
||||||
// 实际执行发送警告、记录日志等操作
|
|
||||||
actionParams := content
|
|
||||||
// ... logic to send warning level event using actionParams ...
|
|
||||||
logger.Warn(ctx, "trigger Warning event", "message", actionParams)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleErrorAction(ctx context.Context, content string) error {
|
|
||||||
// 实际执行发送警告、记录日志等操作
|
|
||||||
actionParams := content
|
|
||||||
// ... logic to send error level event using actionParams ...
|
|
||||||
logger.Warn(ctx, "trigger error event", "message", actionParams)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleCriticalAction(ctx context.Context, content string) error {
|
|
||||||
// 实际执行发送警告、记录日志等操作
|
|
||||||
actionParams := content
|
|
||||||
// ... logic to send critical level event using actionParams ...
|
|
||||||
logger.Warn(ctx, "trigger critical event", "message", actionParams)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleExceptionAction(ctx context.Context, content string) error {
|
|
||||||
// 实际执行发送警告、记录日志等操作
|
|
||||||
actionParams := content
|
|
||||||
// ... logic to send except level event using actionParams ...
|
|
||||||
logger.Warn(ctx, "trigger except event", "message", actionParams)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
@ -4,7 +4,6 @@ package realtimedata
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -12,7 +11,6 @@ import (
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/model"
|
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
"modelRT/orm"
|
"modelRT/orm"
|
||||||
"modelRT/pool"
|
"modelRT/pool"
|
||||||
|
|
@ -47,182 +45,7 @@ func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurem
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
conf, err := initComputeConfig(measurement)
|
// TODO 启动协程准备查询 redis 数据进行计算
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if conf == nil {
|
|
||||||
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
uuidStr := measurement.ComponentUUID.String()
|
|
||||||
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
|
|
||||||
conf.StopGchan = make(chan struct{})
|
|
||||||
globalComputeState.Store(uuidStr, conf)
|
|
||||||
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID)
|
|
||||||
go continuousComputation(enrichedCtx, conf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
enableValue, exist := measurement.EventPlan["enable"]
|
|
||||||
enable, ok := enableValue.(bool)
|
|
||||||
if !exist {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !enable {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
conf := &ComputeConfig{}
|
|
||||||
|
|
||||||
causeValue, exist := measurement.EventPlan["cause"]
|
|
||||||
if !exist {
|
|
||||||
return nil, errors.New("missing required field cause")
|
|
||||||
}
|
|
||||||
|
|
||||||
cause, ok := causeValue.(map[string]any)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
|
|
||||||
}
|
|
||||||
conf.Cause, err = processCauseMap(cause)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
actionValue, exist := measurement.EventPlan["action"]
|
|
||||||
if !exist {
|
|
||||||
return nil, errors.New("missing required field action")
|
|
||||||
}
|
|
||||||
action, ok := actionValue.(map[string]any)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
|
|
||||||
}
|
|
||||||
conf.Action = action
|
|
||||||
|
|
||||||
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
|
|
||||||
}
|
|
||||||
conf.QueryKey = queryKey
|
|
||||||
conf.DataSize = int64(measurement.Size)
|
|
||||||
// TODO use constant values for temporary settings
|
|
||||||
conf.minBreachCount = constants.MinBreachCount
|
|
||||||
|
|
||||||
isFloatCause := false
|
|
||||||
if _, exists := conf.Cause["up"]; exists {
|
|
||||||
isFloatCause = true
|
|
||||||
} else if _, exists := conf.Cause["down"]; exists {
|
|
||||||
isFloatCause = true
|
|
||||||
} else if _, exists := conf.Cause["upup"]; exists {
|
|
||||||
isFloatCause = true
|
|
||||||
} else if _, exists := conf.Cause["downdown"]; exists {
|
|
||||||
isFloatCause = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if isFloatCause {
|
|
||||||
// te config
|
|
||||||
teThresholds, err := parseTEThresholds(conf.Cause)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
|
|
||||||
}
|
|
||||||
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
|
|
||||||
} else {
|
|
||||||
// ti config
|
|
||||||
tiThresholds, err := parseTIThresholds(conf.Cause)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
|
|
||||||
}
|
|
||||||
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func processCauseMap(data map[string]any) (map[string]any, error) {
|
|
||||||
causeResult := make(map[string]any)
|
|
||||||
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
|
||||||
|
|
||||||
var foundFloatKey bool
|
|
||||||
for _, key := range keysToExtract {
|
|
||||||
if value, exists := data[key]; exists {
|
|
||||||
|
|
||||||
foundFloatKey = true
|
|
||||||
|
|
||||||
// check value type
|
|
||||||
if floatVal, ok := value.(float64); ok {
|
|
||||||
causeResult[key] = floatVal
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if foundFloatKey == true {
|
|
||||||
return causeResult, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
edgeKey := "edge"
|
|
||||||
if value, exists := data[edgeKey]; exists {
|
|
||||||
if stringVal, ok := value.(string); ok {
|
|
||||||
switch stringVal {
|
|
||||||
case "raising":
|
|
||||||
fallthrough
|
|
||||||
case "falling":
|
|
||||||
causeResult[edgeKey] = stringVal
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
|
||||||
client := diagram.NewRedisClient()
|
|
||||||
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
|
|
||||||
duration := util.SecondsToDuration(conf.Duration)
|
|
||||||
ticker := time.NewTicker(duration)
|
|
||||||
defer ticker.Stop()
|
|
||||||
startTimestamp := util.GenNanoTsStr()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-conf.StopGchan:
|
|
||||||
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
|
|
||||||
return
|
|
||||||
case <-ctx.Done():
|
|
||||||
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
stopTimestamp := util.GenNanoTsStr()
|
|
||||||
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize, startTimestamp, stopTimestamp)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
startTimestamp = stopTimestamp
|
|
||||||
|
|
||||||
realTimedatas := util.ConvertZSetMembersToFloat64(ctx, members)
|
|
||||||
if conf.Analyzer != nil {
|
|
||||||
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
|
||||||
} else {
|
|
||||||
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -321,17 +144,17 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type realTimeDataPayload struct {
|
type RealTimeDataPayload struct {
|
||||||
ComponentUUID string
|
ComponentUUID string
|
||||||
Values []float64
|
Values []float64
|
||||||
}
|
}
|
||||||
|
|
||||||
type realTimeData struct {
|
type RealTimeData struct {
|
||||||
Payload realTimeDataPayload
|
Payload RealTimeDataPayload
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
|
func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
|
||||||
var realTimeData realTimeData
|
var realTimeData RealTimeData
|
||||||
err := json.Unmarshal(msgValue, &realTimeData)
|
err := json.Unmarshal(msgValue, &realTimeData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
||||||
|
|
@ -339,7 +162,7 @@ func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
|
||||||
return &realTimeData, nil
|
return &realTimeData, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
|
func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
|
||||||
componentUUID := realTimeData.Payload.ComponentUUID
|
componentUUID := realTimeData.Payload.ComponentUUID
|
||||||
component, err := diagram.GetComponentMap(componentUUID)
|
component, err := diagram.GetComponentMap(componentUUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -360,6 +183,7 @@ func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
|
||||||
var anchorRealTimeData []float64
|
var anchorRealTimeData []float64
|
||||||
var calculateFunc func(archorValue float64, args ...float64) float64
|
var calculateFunc func(archorValue float64, args ...float64) float64
|
||||||
|
|
||||||
|
// 收集实时数据
|
||||||
for _, param := range realTimeData.Payload.Values {
|
for _, param := range realTimeData.Payload.Values {
|
||||||
anchorRealTimeData = append(anchorRealTimeData, param)
|
anchorRealTimeData = append(anchorRealTimeData, param)
|
||||||
}
|
}
|
||||||
|
|
@ -383,8 +207,10 @@ func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO 使用select避免channel阻塞
|
||||||
select {
|
select {
|
||||||
case anchorChan <- anchorConfig:
|
case anchorChan <- anchorConfig:
|
||||||
|
// 成功发送
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Info(ctx, "context done while sending to anchor chan")
|
logger.Info(ctx, "context done while sending to anchor chan")
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
|
|
|
||||||
|
|
@ -1,33 +0,0 @@
|
||||||
// Package util provide some utility fun
|
|
||||||
package util
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"modelRT/logger"
|
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64
|
|
||||||
func ConvertZSetMembersToFloat64(ctx context.Context, members []redis.Z) []float64 {
|
|
||||||
dataFloats := make([]float64, 0, len(members))
|
|
||||||
|
|
||||||
for _, member := range members {
|
|
||||||
valStr, ok := member.Member.(string)
|
|
||||||
if !ok {
|
|
||||||
logger.Warn(ctx, "redis zset member value is not a string,skipping")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
valFloat, err := strconv.ParseFloat(valStr, 64)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to parse zset member string to float64", "value", valStr, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dataFloats = append(dataFloats, valFloat)
|
|
||||||
}
|
|
||||||
|
|
||||||
return dataFloats
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue