Compare commits
3 Commits
d434a7737d
...
dff74222c6
| Author | SHA1 | Date |
|---|---|---|
|
|
dff74222c6 | |
|
|
9593c77c18 | |
|
|
8cbbfbd695 |
|
|
@ -0,0 +1,7 @@
|
||||||
|
// Package constants define constant variable
|
||||||
|
package constants
|
||||||
|
|
||||||
|
type contextKey string
|
||||||
|
|
||||||
|
// MeasurementUUIDKey define measurement uuid key into context
|
||||||
|
const MeasurementUUIDKey contextKey = "measurement_uuid"
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
// Package constants define constant variable
|
||||||
|
package constants
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TIBreachTriggerType define out of bounds type constant
|
||||||
|
TIBreachTriggerType = "trigger"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TelemetryUpLimit define telemetry upper limit
|
||||||
|
TelemetryUpLimit = "up"
|
||||||
|
// TelemetryUpUpLimit define telemetry upper upper limit
|
||||||
|
TelemetryUpUpLimit = "upup"
|
||||||
|
|
||||||
|
// TelemetryDownLimit define telemetry limit
|
||||||
|
TelemetryDownLimit = "down"
|
||||||
|
// TelemetryDownDownLimit define telemetry lower lower limit
|
||||||
|
TelemetryDownDownLimit = "downdown"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TelesignalRaising define telesignal raising edge
|
||||||
|
TelesignalRaising = "raising"
|
||||||
|
// TelesignalFalling define telesignal falling edge
|
||||||
|
TelesignalFalling = "falling"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MinBreachCount define min breach count of real time data
|
||||||
|
MinBreachCount = 10
|
||||||
|
)
|
||||||
|
|
@ -0,0 +1,335 @@
|
||||||
|
// Package realtimedata define real time data operation functions
|
||||||
|
package realtimedata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"modelRT/constants"
|
||||||
|
"modelRT/logger"
|
||||||
|
"modelRT/real-time-data/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
|
||||||
|
type RealTimeAnalyzer interface {
|
||||||
|
AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64)
|
||||||
|
}
|
||||||
|
|
||||||
|
// teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause
|
||||||
|
type teEventThresholds struct {
|
||||||
|
up float64
|
||||||
|
upup float64
|
||||||
|
down float64
|
||||||
|
downdown float64
|
||||||
|
isFloatCause bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseTEThresholds define func to parse telemetry thresholds by casue map
|
||||||
|
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
|
||||||
|
t := teEventThresholds{}
|
||||||
|
floatKeys := map[string]*float64{
|
||||||
|
"upup": &t.upup,
|
||||||
|
"up": &t.up,
|
||||||
|
"down": &t.down,
|
||||||
|
"downdown": &t.downdown,
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, ptr := range floatKeys {
|
||||||
|
if value, exists := cause[key]; exists {
|
||||||
|
if floatVal, ok := value.(float64); ok {
|
||||||
|
*ptr = floatVal
|
||||||
|
t.isFloatCause = true
|
||||||
|
} else {
|
||||||
|
return teEventThresholds{}, fmt.Errorf("key:%s type is incorrect. expected float64, actual %T", key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// quickly check mutual exclusion
|
||||||
|
if _, exists := cause["edge"]; exists && t.isFloatCause {
|
||||||
|
return teEventThresholds{}, errors.New("cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found")
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to
|
||||||
|
func getTEBreachType(value float64, t teEventThresholds) string {
|
||||||
|
if t.upup > 0 && value > t.upup {
|
||||||
|
return constants.TelemetryUpUpLimit
|
||||||
|
}
|
||||||
|
if t.up > 0 && value > t.up {
|
||||||
|
return constants.TelemetryUpLimit
|
||||||
|
}
|
||||||
|
if t.downdown > 0 && value < t.downdown {
|
||||||
|
return constants.TelemetryDownDownLimit
|
||||||
|
}
|
||||||
|
if t.down > 0 && value < t.down {
|
||||||
|
return constants.TelemetryDownLimit
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic.
|
||||||
|
type TEAnalyzer struct {
|
||||||
|
Thresholds teEventThresholds
|
||||||
|
}
|
||||||
|
|
||||||
|
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
|
||||||
|
func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
|
||||||
|
analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues)
|
||||||
|
}
|
||||||
|
|
||||||
|
// analyzeTEDataLogic define func to processing telemetry data and event triggering
|
||||||
|
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
|
||||||
|
windowSize := conf.minBreachCount
|
||||||
|
if windowSize <= 0 {
|
||||||
|
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark whether any events have been triggered in this batch
|
||||||
|
var eventTriggered bool
|
||||||
|
breachTriggers := map[string]bool{
|
||||||
|
"up": false, "upup": false, "down": false, "downdown": false,
|
||||||
|
}
|
||||||
|
|
||||||
|
// implement slide window to determine breach counts
|
||||||
|
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
|
||||||
|
window := realTimeValues[i : i+windowSize]
|
||||||
|
firstValueBreachType := getTEBreachType(window[0], thresholds)
|
||||||
|
|
||||||
|
if firstValueBreachType == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
allMatch := true
|
||||||
|
for j := 1; j < windowSize; j++ {
|
||||||
|
currentValueBreachType := getTEBreachType(window[j], thresholds)
|
||||||
|
if currentValueBreachType != firstValueBreachType {
|
||||||
|
allMatch = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if allMatch {
|
||||||
|
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
|
||||||
|
if !breachTriggers[firstValueBreachType] {
|
||||||
|
// trigger event
|
||||||
|
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
|
||||||
|
|
||||||
|
breachTriggers[firstValueBreachType] = true
|
||||||
|
eventTriggered = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventTriggered {
|
||||||
|
command, content := genTEEventCommandAndContent(conf.Action)
|
||||||
|
// TODO 考虑 content 是否可以为空,先期不允许
|
||||||
|
if command == "" || content == "" {
|
||||||
|
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
event.TriggerEventAction(ctx, command, content)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func genTEEventCommandAndContent(action map[string]any) (command string, content string) {
|
||||||
|
cmdValue, exist := action["command"]
|
||||||
|
if !exist {
|
||||||
|
return "", ""
|
||||||
|
}
|
||||||
|
|
||||||
|
commandStr, ok := cmdValue.(string)
|
||||||
|
if !ok {
|
||||||
|
return "", ""
|
||||||
|
}
|
||||||
|
command = commandStr
|
||||||
|
|
||||||
|
paramsValue, exist := action["parametes"]
|
||||||
|
if !exist {
|
||||||
|
return command, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
parameterSlice, ok := paramsValue.([]string)
|
||||||
|
if !ok {
|
||||||
|
return command, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
var builder strings.Builder
|
||||||
|
for i, parameter := range parameterSlice {
|
||||||
|
if i > 0 {
|
||||||
|
builder.WriteString(",")
|
||||||
|
}
|
||||||
|
builder.WriteString(parameter)
|
||||||
|
}
|
||||||
|
|
||||||
|
return command, builder.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause
|
||||||
|
type tiEventThresholds struct {
|
||||||
|
edge string
|
||||||
|
isFloatCause bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseTEThresholds define func to parse telesignal thresholds by casue map
|
||||||
|
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
||||||
|
edgeKey := "edge"
|
||||||
|
t := tiEventThresholds{
|
||||||
|
isFloatCause: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, exists := cause[edgeKey]; exists {
|
||||||
|
if strVal, ok := value.(string); ok {
|
||||||
|
switch strVal {
|
||||||
|
case "raising", "falling":
|
||||||
|
t.edge = strVal
|
||||||
|
return t, nil
|
||||||
|
default:
|
||||||
|
return tiEventThresholds{}, fmt.Errorf("key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'", edgeKey, strVal)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return tiEventThresholds{}, fmt.Errorf("key:%s already exists but type is incorrect. expected string, actual %T", edgeKey, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tiEventThresholds{}, fmt.Errorf("cause map is invalid for telesignal: missing required key '%s'", edgeKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
|
||||||
|
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
|
||||||
|
if t.edge == constants.TelesignalRaising {
|
||||||
|
if previousValue == 0.0 && currentValue == 1.0 {
|
||||||
|
return constants.TIBreachTriggerType
|
||||||
|
}
|
||||||
|
} else if t.edge == constants.TelesignalFalling {
|
||||||
|
if previousValue == 1.0 && currentValue == 0.0 {
|
||||||
|
return constants.TIBreachTriggerType
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic
|
||||||
|
type TIAnalyzer struct {
|
||||||
|
Thresholds tiEventThresholds
|
||||||
|
}
|
||||||
|
|
||||||
|
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
|
||||||
|
func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
|
||||||
|
analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues)
|
||||||
|
}
|
||||||
|
|
||||||
|
// analyzeTIDataLogic define func to processing telesignal data and event triggering
|
||||||
|
func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) {
|
||||||
|
windowSize := conf.minBreachCount
|
||||||
|
if windowSize <= 0 {
|
||||||
|
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
numDataPoints := len(realTimeValues)
|
||||||
|
if numDataPoints < 2 {
|
||||||
|
logger.Info(ctx, "data points less than 2, no change event possible, analysis skipped", "data_points", numDataPoints)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// pre calculate the change event type for all adjacent point pairs
|
||||||
|
numChanges := numDataPoints - 1
|
||||||
|
changeBreachTypes := make([]string, numChanges)
|
||||||
|
|
||||||
|
for i := range numChanges {
|
||||||
|
previousValue := realTimeValues[i]
|
||||||
|
currentValue := realTimeValues[i+1]
|
||||||
|
|
||||||
|
changeBreachTypes[i] = getTIBreachType(currentValue, previousValue, thresholds)
|
||||||
|
}
|
||||||
|
|
||||||
|
if numChanges < windowSize {
|
||||||
|
logger.Error(ctx, "number of change events is less than window size, analysis skipped", "num_changes", numChanges, "window_size", windowSize)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var eventTriggered bool
|
||||||
|
breachTriggers := map[string]bool{
|
||||||
|
constants.TIBreachTriggerType: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i <= numChanges-windowSize; i++ {
|
||||||
|
windowBreachTypes := changeBreachTypes[i : i+windowSize]
|
||||||
|
firstBreachType := windowBreachTypes[0]
|
||||||
|
|
||||||
|
if firstBreachType == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
allMatch := true
|
||||||
|
for j := 1; j < windowSize; j++ {
|
||||||
|
if windowBreachTypes[j] != firstBreachType {
|
||||||
|
allMatch = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if allMatch {
|
||||||
|
if !breachTriggers[firstBreachType] {
|
||||||
|
finalValueIndex := i + windowSize
|
||||||
|
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstBreachType, "value", realTimeValues[finalValueIndex])
|
||||||
|
|
||||||
|
breachTriggers[firstBreachType] = true
|
||||||
|
eventTriggered = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventTriggered {
|
||||||
|
command, content := genTIEventCommandAndContent(conf.Action)
|
||||||
|
// TODO 考虑 content 是否可以为空,先期不允许
|
||||||
|
if command == "" || content == "" {
|
||||||
|
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
event.TriggerEventAction(ctx, command, content)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func genTIEventCommandAndContent(action map[string]any) (command string, content string) {
|
||||||
|
cmdValue, exist := action["command"]
|
||||||
|
if !exist {
|
||||||
|
return "", ""
|
||||||
|
}
|
||||||
|
|
||||||
|
commandStr, ok := cmdValue.(string)
|
||||||
|
if !ok {
|
||||||
|
return "", ""
|
||||||
|
}
|
||||||
|
command = commandStr
|
||||||
|
|
||||||
|
paramsValue, exist := action["parametes"]
|
||||||
|
if !exist {
|
||||||
|
return command, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
parameterSlice, ok := paramsValue.([]string)
|
||||||
|
if !ok {
|
||||||
|
return command, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
var builder strings.Builder
|
||||||
|
for i, parameter := range parameterSlice {
|
||||||
|
if i > 0 {
|
||||||
|
builder.WriteString(",")
|
||||||
|
}
|
||||||
|
builder.WriteString(parameter)
|
||||||
|
}
|
||||||
|
|
||||||
|
return command, builder.String()
|
||||||
|
}
|
||||||
|
|
@ -1,13 +1,22 @@
|
||||||
// Package realtimedata define real time data operation functions
|
// Package realtimedata define real time data operation functions
|
||||||
package realtimedata
|
package realtimedata
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
// ComputeConfig define struct of measurement computation
|
// ComputeConfig define struct of measurement computation
|
||||||
type ComputeConfig struct {
|
type ComputeConfig struct {
|
||||||
Cause map[string]any
|
Cause map[string]any
|
||||||
Action map[string]any
|
Action map[string]any
|
||||||
StopGchan chan struct{}
|
// TODO 预留自由调整的入口
|
||||||
|
// min consecutive breach count
|
||||||
|
minBreachCount int
|
||||||
|
Duration int
|
||||||
|
DataSize int64
|
||||||
|
QueryKey string
|
||||||
|
StopGchan chan struct{}
|
||||||
|
Analyzer RealTimeAnalyzer
|
||||||
}
|
}
|
||||||
|
|
||||||
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
|
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,74 @@
|
||||||
|
// Package event define real time data evnet operation functions
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"modelRT/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
type actionHandler func(ctx context.Context, content string) error
|
||||||
|
|
||||||
|
// actionDispatchMap define variable to store all action handler into map
|
||||||
|
var actionDispatchMap = map[string]actionHandler{
|
||||||
|
"info": handleInfoAction,
|
||||||
|
"warning": handleWarningAction,
|
||||||
|
"error": handleErrorAction,
|
||||||
|
"critical": handleCriticalAction,
|
||||||
|
"exception": handleExceptionAction,
|
||||||
|
}
|
||||||
|
|
||||||
|
// TriggerEventAction define func to trigger event by action in compute config
|
||||||
|
func TriggerEventAction(ctx context.Context, command string, content string) {
|
||||||
|
handler, exists := actionDispatchMap[command]
|
||||||
|
if !exists {
|
||||||
|
logger.Error(ctx, "unknown action command", "command", command)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := handler(ctx, content)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Info(ctx, "action handler success", "command", command, "content", content)
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleInfoAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send info level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger info event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleWarningAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send warning level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger Warning event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleErrorAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send error level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger error event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleCriticalAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send critical level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger critical event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleExceptionAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send except level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger except event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -4,6 +4,7 @@ package realtimedata
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -11,6 +12,7 @@ import (
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
"modelRT/model"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
"modelRT/orm"
|
"modelRT/orm"
|
||||||
"modelRT/pool"
|
"modelRT/pool"
|
||||||
|
|
@ -45,7 +47,182 @@ func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurem
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO 启动协程准备查询 redis 数据进行计算
|
conf, err := initComputeConfig(measurement)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf == nil {
|
||||||
|
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
uuidStr := measurement.ComponentUUID.String()
|
||||||
|
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
|
||||||
|
conf.StopGchan = make(chan struct{})
|
||||||
|
globalComputeState.Store(uuidStr, conf)
|
||||||
|
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID)
|
||||||
|
go continuousComputation(enrichedCtx, conf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
enableValue, exist := measurement.EventPlan["enable"]
|
||||||
|
enable, ok := enableValue.(bool)
|
||||||
|
if !exist {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !enable {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
conf := &ComputeConfig{}
|
||||||
|
|
||||||
|
causeValue, exist := measurement.EventPlan["cause"]
|
||||||
|
if !exist {
|
||||||
|
return nil, errors.New("missing required field cause")
|
||||||
|
}
|
||||||
|
|
||||||
|
cause, ok := causeValue.(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
|
||||||
|
}
|
||||||
|
conf.Cause, err = processCauseMap(cause)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actionValue, exist := measurement.EventPlan["action"]
|
||||||
|
if !exist {
|
||||||
|
return nil, errors.New("missing required field action")
|
||||||
|
}
|
||||||
|
action, ok := actionValue.(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
|
||||||
|
}
|
||||||
|
conf.Action = action
|
||||||
|
|
||||||
|
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
|
||||||
|
}
|
||||||
|
conf.QueryKey = queryKey
|
||||||
|
conf.DataSize = int64(measurement.Size)
|
||||||
|
// TODO use constant values for temporary settings
|
||||||
|
conf.minBreachCount = constants.MinBreachCount
|
||||||
|
|
||||||
|
isFloatCause := false
|
||||||
|
if _, exists := conf.Cause["up"]; exists {
|
||||||
|
isFloatCause = true
|
||||||
|
} else if _, exists := conf.Cause["down"]; exists {
|
||||||
|
isFloatCause = true
|
||||||
|
} else if _, exists := conf.Cause["upup"]; exists {
|
||||||
|
isFloatCause = true
|
||||||
|
} else if _, exists := conf.Cause["downdown"]; exists {
|
||||||
|
isFloatCause = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if isFloatCause {
|
||||||
|
// te config
|
||||||
|
teThresholds, err := parseTEThresholds(conf.Cause)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
|
||||||
|
}
|
||||||
|
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
|
||||||
|
} else {
|
||||||
|
// ti config
|
||||||
|
tiThresholds, err := parseTIThresholds(conf.Cause)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
|
||||||
|
}
|
||||||
|
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
|
||||||
|
}
|
||||||
|
|
||||||
|
return conf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func processCauseMap(data map[string]any) (map[string]any, error) {
|
||||||
|
causeResult := make(map[string]any)
|
||||||
|
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
||||||
|
|
||||||
|
var foundFloatKey bool
|
||||||
|
for _, key := range keysToExtract {
|
||||||
|
if value, exists := data[key]; exists {
|
||||||
|
|
||||||
|
foundFloatKey = true
|
||||||
|
|
||||||
|
// check value type
|
||||||
|
if floatVal, ok := value.(float64); ok {
|
||||||
|
causeResult[key] = floatVal
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if foundFloatKey == true {
|
||||||
|
return causeResult, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
edgeKey := "edge"
|
||||||
|
if value, exists := data[edgeKey]; exists {
|
||||||
|
if stringVal, ok := value.(string); ok {
|
||||||
|
switch stringVal {
|
||||||
|
case "raising":
|
||||||
|
fallthrough
|
||||||
|
case "falling":
|
||||||
|
causeResult[edgeKey] = stringVal
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
||||||
|
client := diagram.NewRedisClient()
|
||||||
|
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
|
||||||
|
duration := util.SecondsToDuration(conf.Duration)
|
||||||
|
ticker := time.NewTicker(duration)
|
||||||
|
defer ticker.Stop()
|
||||||
|
startTimestamp := util.GenNanoTsStr()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-conf.StopGchan:
|
||||||
|
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
stopTimestamp := util.GenNanoTsStr()
|
||||||
|
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize, startTimestamp, stopTimestamp)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
startTimestamp = stopTimestamp
|
||||||
|
|
||||||
|
realTimedatas := util.ConvertZSetMembersToFloat64(ctx, members)
|
||||||
|
if conf.Analyzer != nil {
|
||||||
|
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
||||||
|
} else {
|
||||||
|
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -144,17 +321,17 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type RealTimeDataPayload struct {
|
type realTimeDataPayload struct {
|
||||||
ComponentUUID string
|
ComponentUUID string
|
||||||
Values []float64
|
Values []float64
|
||||||
}
|
}
|
||||||
|
|
||||||
type RealTimeData struct {
|
type realTimeData struct {
|
||||||
Payload RealTimeDataPayload
|
Payload realTimeDataPayload
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
|
func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
|
||||||
var realTimeData RealTimeData
|
var realTimeData realTimeData
|
||||||
err := json.Unmarshal(msgValue, &realTimeData)
|
err := json.Unmarshal(msgValue, &realTimeData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
||||||
|
|
@ -162,7 +339,7 @@ func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
|
||||||
return &realTimeData, nil
|
return &realTimeData, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
|
func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
|
||||||
componentUUID := realTimeData.Payload.ComponentUUID
|
componentUUID := realTimeData.Payload.ComponentUUID
|
||||||
component, err := diagram.GetComponentMap(componentUUID)
|
component, err := diagram.GetComponentMap(componentUUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -183,7 +360,6 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
|
||||||
var anchorRealTimeData []float64
|
var anchorRealTimeData []float64
|
||||||
var calculateFunc func(archorValue float64, args ...float64) float64
|
var calculateFunc func(archorValue float64, args ...float64) float64
|
||||||
|
|
||||||
// 收集实时数据
|
|
||||||
for _, param := range realTimeData.Payload.Values {
|
for _, param := range realTimeData.Payload.Values {
|
||||||
anchorRealTimeData = append(anchorRealTimeData, param)
|
anchorRealTimeData = append(anchorRealTimeData, param)
|
||||||
}
|
}
|
||||||
|
|
@ -207,10 +383,8 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO 使用select避免channel阻塞
|
|
||||||
select {
|
select {
|
||||||
case anchorChan <- anchorConfig:
|
case anchorChan <- anchorConfig:
|
||||||
// 成功发送
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Info(ctx, "context done while sending to anchor chan")
|
logger.Info(ctx, "context done while sending to anchor chan")
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
// Package util provide some utility fun
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"modelRT/logger"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64
|
||||||
|
func ConvertZSetMembersToFloat64(ctx context.Context, members []redis.Z) []float64 {
|
||||||
|
dataFloats := make([]float64, 0, len(members))
|
||||||
|
|
||||||
|
for _, member := range members {
|
||||||
|
valStr, ok := member.Member.(string)
|
||||||
|
if !ok {
|
||||||
|
logger.Warn(ctx, "redis zset member value is not a string,skipping")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
valFloat, err := strconv.ParseFloat(valStr, 64)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "failed to parse zset member string to float64", "value", valStr, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dataFloats = append(dataFloats, valFloat)
|
||||||
|
}
|
||||||
|
|
||||||
|
return dataFloats
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue