Compare commits

..

3 Commits

7 changed files with 677 additions and 14 deletions

7
constants/context.go Normal file
View File

@ -0,0 +1,7 @@
// Package constants define constant variable
package constants
type contextKey string
// MeasurementUUIDKey define measurement uuid key into context
const MeasurementUUIDKey contextKey = "measurement_uuid"

31
constants/event.go Normal file
View File

@ -0,0 +1,31 @@
// Package constants define constant variable
package constants
const (
// TIBreachTriggerType define out of bounds type constant
TIBreachTriggerType = "trigger"
)
const (
// TelemetryUpLimit define telemetry upper limit
TelemetryUpLimit = "up"
// TelemetryUpUpLimit define telemetry upper upper limit
TelemetryUpUpLimit = "upup"
// TelemetryDownLimit define telemetry limit
TelemetryDownLimit = "down"
// TelemetryDownDownLimit define telemetry lower lower limit
TelemetryDownDownLimit = "downdown"
)
const (
// TelesignalRaising define telesignal raising edge
TelesignalRaising = "raising"
// TelesignalFalling define telesignal falling edge
TelesignalFalling = "falling"
)
const (
// MinBreachCount define min breach count of real time data
MinBreachCount = 10
)

View File

@ -0,0 +1,335 @@
// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"errors"
"fmt"
"strings"
"modelRT/constants"
"modelRT/logger"
"modelRT/real-time-data/event"
)
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
type RealTimeAnalyzer interface {
AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64)
}
// teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause
type teEventThresholds struct {
up float64
upup float64
down float64
downdown float64
isFloatCause bool
}
// parseTEThresholds define func to parse telemetry thresholds by casue map
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
t := teEventThresholds{}
floatKeys := map[string]*float64{
"upup": &t.upup,
"up": &t.up,
"down": &t.down,
"downdown": &t.downdown,
}
for key, ptr := range floatKeys {
if value, exists := cause[key]; exists {
if floatVal, ok := value.(float64); ok {
*ptr = floatVal
t.isFloatCause = true
} else {
return teEventThresholds{}, fmt.Errorf("key:%s type is incorrect. expected float64, actual %T", key, value)
}
}
}
// quickly check mutual exclusion
if _, exists := cause["edge"]; exists && t.isFloatCause {
return teEventThresholds{}, errors.New("cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found")
}
return t, nil
}
// getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to
func getTEBreachType(value float64, t teEventThresholds) string {
if t.upup > 0 && value > t.upup {
return constants.TelemetryUpUpLimit
}
if t.up > 0 && value > t.up {
return constants.TelemetryUpLimit
}
if t.downdown > 0 && value < t.downdown {
return constants.TelemetryDownDownLimit
}
if t.down > 0 && value < t.down {
return constants.TelemetryDownLimit
}
return ""
}
// TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic.
type TEAnalyzer struct {
Thresholds teEventThresholds
}
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues)
}
// analyzeTEDataLogic define func to processing telemetry data and event triggering
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
windowSize := conf.minBreachCount
if windowSize <= 0 {
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
return
}
// mark whether any events have been triggered in this batch
var eventTriggered bool
breachTriggers := map[string]bool{
"up": false, "upup": false, "down": false, "downdown": false,
}
// implement slide window to determine breach counts
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
window := realTimeValues[i : i+windowSize]
firstValueBreachType := getTEBreachType(window[0], thresholds)
if firstValueBreachType == "" {
continue
}
allMatch := true
for j := 1; j < windowSize; j++ {
currentValueBreachType := getTEBreachType(window[j], thresholds)
if currentValueBreachType != firstValueBreachType {
allMatch = false
break
}
}
if allMatch {
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
if !breachTriggers[firstValueBreachType] {
// trigger event
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
breachTriggers[firstValueBreachType] = true
eventTriggered = true
}
}
}
if eventTriggered {
command, content := genTEEventCommandAndContent(conf.Action)
// TODO 考虑 content 是否可以为空,先期不允许
if command == "" || content == "" {
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
return
}
event.TriggerEventAction(ctx, command, content)
return
}
logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.")
}
func genTEEventCommandAndContent(action map[string]any) (command string, content string) {
cmdValue, exist := action["command"]
if !exist {
return "", ""
}
commandStr, ok := cmdValue.(string)
if !ok {
return "", ""
}
command = commandStr
paramsValue, exist := action["parametes"]
if !exist {
return command, ""
}
parameterSlice, ok := paramsValue.([]string)
if !ok {
return command, ""
}
var builder strings.Builder
for i, parameter := range parameterSlice {
if i > 0 {
builder.WriteString(",")
}
builder.WriteString(parameter)
}
return command, builder.String()
}
// tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause
type tiEventThresholds struct {
edge string
isFloatCause bool
}
// parseTEThresholds define func to parse telesignal thresholds by casue map
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
edgeKey := "edge"
t := tiEventThresholds{
isFloatCause: false,
}
if value, exists := cause[edgeKey]; exists {
if strVal, ok := value.(string); ok {
switch strVal {
case "raising", "falling":
t.edge = strVal
return t, nil
default:
return tiEventThresholds{}, fmt.Errorf("key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'", edgeKey, strVal)
}
} else {
return tiEventThresholds{}, fmt.Errorf("key:%s already exists but type is incorrect. expected string, actual %T", edgeKey, value)
}
}
return tiEventThresholds{}, fmt.Errorf("cause map is invalid for telesignal: missing required key '%s'", edgeKey)
}
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
if t.edge == constants.TelesignalRaising {
if previousValue == 0.0 && currentValue == 1.0 {
return constants.TIBreachTriggerType
}
} else if t.edge == constants.TelesignalFalling {
if previousValue == 1.0 && currentValue == 0.0 {
return constants.TIBreachTriggerType
}
}
return ""
}
// TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic
type TIAnalyzer struct {
Thresholds tiEventThresholds
}
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues)
}
// analyzeTIDataLogic define func to processing telesignal data and event triggering
func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) {
windowSize := conf.minBreachCount
if windowSize <= 0 {
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
return
}
numDataPoints := len(realTimeValues)
if numDataPoints < 2 {
logger.Info(ctx, "data points less than 2, no change event possible, analysis skipped", "data_points", numDataPoints)
return
}
// pre calculate the change event type for all adjacent point pairs
numChanges := numDataPoints - 1
changeBreachTypes := make([]string, numChanges)
for i := range numChanges {
previousValue := realTimeValues[i]
currentValue := realTimeValues[i+1]
changeBreachTypes[i] = getTIBreachType(currentValue, previousValue, thresholds)
}
if numChanges < windowSize {
logger.Error(ctx, "number of change events is less than window size, analysis skipped", "num_changes", numChanges, "window_size", windowSize)
return
}
var eventTriggered bool
breachTriggers := map[string]bool{
constants.TIBreachTriggerType: false,
}
for i := 0; i <= numChanges-windowSize; i++ {
windowBreachTypes := changeBreachTypes[i : i+windowSize]
firstBreachType := windowBreachTypes[0]
if firstBreachType == "" {
continue
}
allMatch := true
for j := 1; j < windowSize; j++ {
if windowBreachTypes[j] != firstBreachType {
allMatch = false
break
}
}
if allMatch {
if !breachTriggers[firstBreachType] {
finalValueIndex := i + windowSize
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstBreachType, "value", realTimeValues[finalValueIndex])
breachTriggers[firstBreachType] = true
eventTriggered = true
}
}
}
if eventTriggered {
command, content := genTIEventCommandAndContent(conf.Action)
// TODO 考虑 content 是否可以为空,先期不允许
if command == "" || content == "" {
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
return
}
event.TriggerEventAction(ctx, command, content)
return
}
logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.")
}
func genTIEventCommandAndContent(action map[string]any) (command string, content string) {
cmdValue, exist := action["command"]
if !exist {
return "", ""
}
commandStr, ok := cmdValue.(string)
if !ok {
return "", ""
}
command = commandStr
paramsValue, exist := action["parametes"]
if !exist {
return command, ""
}
parameterSlice, ok := paramsValue.([]string)
if !ok {
return command, ""
}
var builder strings.Builder
for i, parameter := range parameterSlice {
if i > 0 {
builder.WriteString(",")
}
builder.WriteString(parameter)
}
return command, builder.String()
}

View File

@ -1,13 +1,22 @@
// Package realtimedata define real time data operation functions
package realtimedata
import "sync"
import (
"sync"
)
// ComputeConfig define struct of measurement computation
type ComputeConfig struct {
Cause map[string]any
Action map[string]any
StopGchan chan struct{}
Cause map[string]any
Action map[string]any
// TODO 预留自由调整的入口
// min consecutive breach count
minBreachCount int
Duration int
DataSize int64
QueryKey string
StopGchan chan struct{}
Analyzer RealTimeAnalyzer
}
// MeasComputeState define struct of manages the state of measurement computations using sync.Map

View File

@ -0,0 +1,74 @@
// Package event define real time data evnet operation functions
package event
import (
"context"
"modelRT/logger"
)
type actionHandler func(ctx context.Context, content string) error
// actionDispatchMap define variable to store all action handler into map
var actionDispatchMap = map[string]actionHandler{
"info": handleInfoAction,
"warning": handleWarningAction,
"error": handleErrorAction,
"critical": handleCriticalAction,
"exception": handleExceptionAction,
}
// TriggerEventAction define func to trigger event by action in compute config
func TriggerEventAction(ctx context.Context, command string, content string) {
handler, exists := actionDispatchMap[command]
if !exists {
logger.Error(ctx, "unknown action command", "command", command)
return
}
err := handler(ctx, content)
if err != nil {
logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err)
return
}
logger.Info(ctx, "action handler success", "command", command, "content", content)
}
func handleInfoAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send info level event using actionParams ...
logger.Warn(ctx, "trigger info event", "message", actionParams)
return nil
}
func handleWarningAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send warning level event using actionParams ...
logger.Warn(ctx, "trigger Warning event", "message", actionParams)
return nil
}
func handleErrorAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send error level event using actionParams ...
logger.Warn(ctx, "trigger error event", "message", actionParams)
return nil
}
func handleCriticalAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send critical level event using actionParams ...
logger.Warn(ctx, "trigger critical event", "message", actionParams)
return nil
}
func handleExceptionAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send except level event using actionParams ...
logger.Warn(ctx, "trigger except event", "message", actionParams)
return nil
}

View File

@ -4,6 +4,7 @@ package realtimedata
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
@ -11,6 +12,7 @@ import (
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/orm"
"modelRT/pool"
@ -45,7 +47,182 @@ func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurem
continue
}
// TODO 启动协程准备查询 redis 数据进行计算
conf, err := initComputeConfig(measurement)
if err != nil {
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
continue
}
if conf == nil {
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
continue
}
uuidStr := measurement.ComponentUUID.String()
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
conf.StopGchan = make(chan struct{})
globalComputeState.Store(uuidStr, conf)
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID)
go continuousComputation(enrichedCtx, conf)
}
}
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
var err error
enableValue, exist := measurement.EventPlan["enable"]
enable, ok := enableValue.(bool)
if !exist {
return nil, nil
}
if !ok {
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
}
if !enable {
return nil, nil
}
conf := &ComputeConfig{}
causeValue, exist := measurement.EventPlan["cause"]
if !exist {
return nil, errors.New("missing required field cause")
}
cause, ok := causeValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
}
conf.Cause, err = processCauseMap(cause)
if err != nil {
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
}
actionValue, exist := measurement.EventPlan["action"]
if !exist {
return nil, errors.New("missing required field action")
}
action, ok := actionValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
}
conf.Action = action
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
}
conf.QueryKey = queryKey
conf.DataSize = int64(measurement.Size)
// TODO use constant values for temporary settings
conf.minBreachCount = constants.MinBreachCount
isFloatCause := false
if _, exists := conf.Cause["up"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["down"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["upup"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["downdown"]; exists {
isFloatCause = true
}
if isFloatCause {
// te config
teThresholds, err := parseTEThresholds(conf.Cause)
if err != nil {
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
}
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
} else {
// ti config
tiThresholds, err := parseTIThresholds(conf.Cause)
if err != nil {
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
}
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
}
return conf, nil
}
func processCauseMap(data map[string]any) (map[string]any, error) {
causeResult := make(map[string]any)
keysToExtract := []string{"up", "down", "upup", "downdown"}
var foundFloatKey bool
for _, key := range keysToExtract {
if value, exists := data[key]; exists {
foundFloatKey = true
// check value type
if floatVal, ok := value.(float64); ok {
causeResult[key] = floatVal
} else {
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
}
}
}
if foundFloatKey == true {
return causeResult, nil
}
edgeKey := "edge"
if value, exists := data[edgeKey]; exists {
if stringVal, ok := value.(string); ok {
switch stringVal {
case "raising":
fallthrough
case "falling":
causeResult[edgeKey] = stringVal
default:
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
}
} else {
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
}
} else {
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
}
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
}
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
client := diagram.NewRedisClient()
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
duration := util.SecondsToDuration(conf.Duration)
ticker := time.NewTicker(duration)
defer ticker.Stop()
startTimestamp := util.GenNanoTsStr()
for {
select {
case <-conf.StopGchan:
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
return
case <-ctx.Done():
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
return
case <-ticker.C:
stopTimestamp := util.GenNanoTsStr()
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize, startTimestamp, stopTimestamp)
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
continue
}
startTimestamp = stopTimestamp
realTimedatas := util.ConvertZSetMembersToFloat64(ctx, members)
if conf.Analyzer != nil {
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
} else {
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
}
}
}
}
@ -144,17 +321,17 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []
}
}
type RealTimeDataPayload struct {
type realTimeDataPayload struct {
ComponentUUID string
Values []float64
}
type RealTimeData struct {
Payload RealTimeDataPayload
type realTimeData struct {
Payload realTimeDataPayload
}
func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
var realTimeData RealTimeData
func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
var realTimeData realTimeData
err := json.Unmarshal(msgValue, &realTimeData)
if err != nil {
return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
@ -162,7 +339,7 @@ func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
return &realTimeData, nil
}
func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
componentUUID := realTimeData.Payload.ComponentUUID
component, err := diagram.GetComponentMap(componentUUID)
if err != nil {
@ -183,7 +360,6 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
var anchorRealTimeData []float64
var calculateFunc func(archorValue float64, args ...float64) float64
// 收集实时数据
for _, param := range realTimeData.Payload.Values {
anchorRealTimeData = append(anchorRealTimeData, param)
}
@ -207,10 +383,8 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
return
}
// TODO 使用select避免channel阻塞
select {
case anchorChan <- anchorConfig:
// 成功发送
case <-ctx.Done():
logger.Info(ctx, "context done while sending to anchor chan")
case <-time.After(5 * time.Second):

33
util/convert.go Normal file
View File

@ -0,0 +1,33 @@
// Package util provide some utility fun
package util
import (
"context"
"strconv"
"modelRT/logger"
"github.com/redis/go-redis/v9"
)
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64
func ConvertZSetMembersToFloat64(ctx context.Context, members []redis.Z) []float64 {
dataFloats := make([]float64, 0, len(members))
for _, member := range members {
valStr, ok := member.Member.(string)
if !ok {
logger.Warn(ctx, "redis zset member value is not a string,skipping")
continue
}
valFloat, err := strconv.ParseFloat(valStr, 64)
if err != nil {
logger.Error(ctx, "failed to parse zset member string to float64", "value", valStr, "error", err)
continue
}
dataFloats = append(dataFloats, valFloat)
}
return dataFloats
}