2025-11-17 16:39:26 +08:00
// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"errors"
"fmt"
2025-11-18 16:46:47 +08:00
"strings"
2025-11-17 16:39:26 +08:00
"modelRT/constants"
"modelRT/logger"
2025-11-18 16:46:47 +08:00
"modelRT/real-time-data/event"
2025-11-17 16:39:26 +08:00
)
2025-11-18 16:46:47 +08:00
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
2025-11-17 16:39:26 +08:00
type RealTimeAnalyzer interface {
AnalyzeAndTriggerEvent ( ctx context . Context , conf * ComputeConfig , realTimeValues [ ] float64 )
}
// teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause
type teEventThresholds struct {
up float64
upup float64
down float64
downdown float64
isFloatCause bool
}
2026-02-26 16:48:12 +08:00
type teBreachTrigger struct {
breachType string
triggered bool
triggeredValues [ ] float64
eventOpts [ ] event . EventOption
}
2025-11-17 16:39:26 +08:00
// parseTEThresholds define func to parse telemetry thresholds by casue map
func parseTEThresholds ( cause map [ string ] any ) ( teEventThresholds , error ) {
t := teEventThresholds { }
floatKeys := map [ string ] * float64 {
"upup" : & t . upup ,
"up" : & t . up ,
"down" : & t . down ,
"downdown" : & t . downdown ,
}
for key , ptr := range floatKeys {
if value , exists := cause [ key ] ; exists {
if floatVal , ok := value . ( float64 ) ; ok {
* ptr = floatVal
t . isFloatCause = true
} else {
return teEventThresholds { } , fmt . Errorf ( "key:%s type is incorrect. expected float64, actual %T" , key , value )
}
}
}
// quickly check mutual exclusion
if _ , exists := cause [ "edge" ] ; exists && t . isFloatCause {
return teEventThresholds { } , errors . New ( "cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found" )
}
return t , nil
}
// getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to
func getTEBreachType ( value float64 , t teEventThresholds ) string {
if t . upup > 0 && value > t . upup {
return constants . TelemetryUpUpLimit
}
if t . up > 0 && value > t . up {
return constants . TelemetryUpLimit
}
if t . downdown > 0 && value < t . downdown {
return constants . TelemetryDownDownLimit
}
if t . down > 0 && value < t . down {
return constants . TelemetryDownLimit
}
return ""
}
// TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic.
type TEAnalyzer struct {
Thresholds teEventThresholds
}
2025-11-18 16:46:47 +08:00
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
2025-11-17 16:39:26 +08:00
func ( t * TEAnalyzer ) AnalyzeAndTriggerEvent ( ctx context . Context , conf * ComputeConfig , realTimeValues [ ] float64 ) {
analyzeTEDataLogic ( ctx , conf , t . Thresholds , realTimeValues )
}
2025-11-18 16:46:47 +08:00
// analyzeTEDataLogic define func to processing telemetry data and event triggering
2025-11-17 16:39:26 +08:00
func analyzeTEDataLogic ( ctx context . Context , conf * ComputeConfig , thresholds teEventThresholds , realTimeValues [ ] float64 ) {
windowSize := conf . minBreachCount
2026-02-26 16:48:12 +08:00
dataLen := len ( realTimeValues )
if dataLen < windowSize || windowSize <= 0 {
2025-11-17 16:39:26 +08:00
return
}
2026-02-26 16:48:12 +08:00
statusArray := make ( [ ] string , dataLen )
for i , val := range realTimeValues {
statusArray [ i ] = getTEBreachType ( val , thresholds )
2025-11-17 16:39:26 +08:00
}
2026-02-26 16:48:12 +08:00
breachTriggers := make ( map [ string ] teBreachTrigger )
for i := 0 ; i <= dataLen - windowSize ; i ++ {
firstBreachType := statusArray [ i ]
2025-11-17 16:39:26 +08:00
2026-02-26 16:48:12 +08:00
// if the first value in the window does not breach, skip this window directly
if firstBreachType == "" {
2025-11-17 16:39:26 +08:00
continue
}
allMatch := true
for j := 1 ; j < windowSize ; j ++ {
2026-02-26 16:48:12 +08:00
if statusArray [ i + j ] != firstBreachType {
2025-11-17 16:39:26 +08:00
allMatch = false
break
}
}
if allMatch {
2026-02-26 16:48:12 +08:00
triggerValues := realTimeValues [ i : i + windowSize ]
2025-11-17 16:39:26 +08:00
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
2026-02-26 16:48:12 +08:00
_ , exists := breachTriggers [ firstBreachType ]
if ! exists {
logger . Warn ( ctx , "event triggered by sliding window" ,
"breach_type" , firstBreachType ,
"trigger_values" , triggerValues )
// build Options
opts := [ ] event . EventOption {
event . WithConditionValue ( triggerValues , conf . Cause ) ,
event . WithTEAnalysisResult ( firstBreachType ) ,
2026-02-28 17:38:33 +08:00
event . WithCategory ( constants . EventUpDownRoutingKey ) ,
2026-02-26 16:48:12 +08:00
// TODO 生成 operations并考虑如何放入 event 中
// event.WithOperations(nil)
}
breachTriggers [ firstBreachType ] = teBreachTrigger {
breachType : firstBreachType ,
triggered : false ,
triggeredValues : triggerValues ,
eventOpts : opts ,
}
2025-11-17 16:39:26 +08:00
}
}
}
2026-02-26 16:48:12 +08:00
for breachType , trigger := range breachTriggers {
// trigger Action
command , mainBody := genTEEventCommandAndMainBody ( ctx , conf . Action )
eventName := fmt . Sprintf ( "telemetry_%s_%s_Breach_Event" , mainBody , breachType )
event . TriggerEventAction ( ctx , command , eventName , trigger . eventOpts ... )
2025-11-18 16:46:47 +08:00
}
}
2026-02-26 16:48:12 +08:00
func genTEEventCommandAndMainBody ( ctx context . Context , action map [ string ] any ) ( command string , mainBody string ) {
2025-11-18 16:46:47 +08:00
cmdValue , exist := action [ "command" ]
if ! exist {
2025-12-01 17:22:29 +08:00
logger . Error ( ctx , "can not find command variable into action map" , "action" , action )
2025-11-18 16:46:47 +08:00
return "" , ""
}
commandStr , ok := cmdValue . ( string )
if ! ok {
2025-12-01 17:22:29 +08:00
logger . Error ( ctx , "convert command to string type failed" , "command" , cmdValue , "type" , fmt . Sprintf ( "%T" , cmdValue ) )
2025-11-18 16:46:47 +08:00
return "" , ""
2025-11-17 16:39:26 +08:00
}
2025-11-18 16:46:47 +08:00
command = commandStr
2025-12-01 17:22:29 +08:00
paramsValue , exist := action [ "parameters" ]
2025-11-18 16:46:47 +08:00
if ! exist {
2025-12-01 17:22:29 +08:00
logger . Error ( ctx , "can not find parameters variable into action map" , "action" , action )
2025-11-18 16:46:47 +08:00
return command , ""
}
2025-12-01 17:22:29 +08:00
parameterSlice , ok := paramsValue . ( [ ] any )
2025-11-18 16:46:47 +08:00
if ! ok {
2025-12-01 17:22:29 +08:00
logger . Error ( ctx , "convert parameters to []any type failed" , "parameters" , paramsValue , "type" , fmt . Sprintf ( "%T" , paramsValue ) )
2025-11-18 16:46:47 +08:00
return command , ""
}
var builder strings . Builder
for i , parameter := range parameterSlice {
if i > 0 {
builder . WriteString ( "," )
}
2025-12-01 17:22:29 +08:00
parameterStr , ok := parameter . ( string )
if ! ok {
logger . Warn ( ctx , "parameter type is incorrect, skip this parameter" , "parameter" , parameter , "type" , fmt . Sprintf ( "%T" , parameter ) )
continue
}
builder . WriteString ( parameterStr )
2025-11-18 16:46:47 +08:00
}
return command , builder . String ( )
2025-11-17 16:39:26 +08:00
}
// tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause
type tiEventThresholds struct {
edge string
isFloatCause bool
}
2026-02-26 16:48:12 +08:00
// parseTIThresholds define func to parse telesignal thresholds by casue map
2025-11-17 16:39:26 +08:00
func parseTIThresholds ( cause map [ string ] any ) ( tiEventThresholds , error ) {
edgeKey := "edge"
t := tiEventThresholds {
isFloatCause : false ,
}
if value , exists := cause [ edgeKey ] ; exists {
if strVal , ok := value . ( string ) ; ok {
switch strVal {
case "raising" , "falling" :
t . edge = strVal
return t , nil
default :
return tiEventThresholds { } , fmt . Errorf ( "key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'" , edgeKey , strVal )
}
} else {
return tiEventThresholds { } , fmt . Errorf ( "key:%s already exists but type is incorrect. expected string, actual %T" , edgeKey , value )
}
}
return tiEventThresholds { } , fmt . Errorf ( "cause map is invalid for telesignal: missing required key '%s'" , edgeKey )
}
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
func getTIBreachType ( currentValue float64 , previousValue float64 , t tiEventThresholds ) string {
2026-02-25 17:14:25 +08:00
switch t . edge {
case constants . TelesignalRaising :
2025-11-17 16:39:26 +08:00
if previousValue == 0.0 && currentValue == 1.0 {
return constants . TIBreachTriggerType
}
2026-02-25 17:14:25 +08:00
case constants . TelesignalFalling :
2025-11-17 16:39:26 +08:00
if previousValue == 1.0 && currentValue == 0.0 {
return constants . TIBreachTriggerType
}
}
return ""
}
// TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic
type TIAnalyzer struct {
Thresholds tiEventThresholds
}
2025-11-18 16:46:47 +08:00
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
2025-11-17 16:39:26 +08:00
func ( t * TIAnalyzer ) AnalyzeAndTriggerEvent ( ctx context . Context , conf * ComputeConfig , realTimeValues [ ] float64 ) {
analyzeTIDataLogic ( ctx , conf , t . Thresholds , realTimeValues )
}
2025-11-18 16:46:47 +08:00
// analyzeTIDataLogic define func to processing telesignal data and event triggering
2025-11-17 16:39:26 +08:00
func analyzeTIDataLogic ( ctx context . Context , conf * ComputeConfig , thresholds tiEventThresholds , realTimeValues [ ] float64 ) {
windowSize := conf . minBreachCount
if windowSize <= 0 {
logger . Error ( ctx , "variable minBreachCount is invalid or zero, analysis skipped" , "minBreachCount" , windowSize )
return
}
numDataPoints := len ( realTimeValues )
if numDataPoints < 2 {
logger . Info ( ctx , "data points less than 2, no change event possible, analysis skipped" , "data_points" , numDataPoints )
return
}
// pre calculate the change event type for all adjacent point pairs
numChanges := numDataPoints - 1
changeBreachTypes := make ( [ ] string , numChanges )
for i := range numChanges {
previousValue := realTimeValues [ i ]
currentValue := realTimeValues [ i + 1 ]
changeBreachTypes [ i ] = getTIBreachType ( currentValue , previousValue , thresholds )
}
if numChanges < windowSize {
logger . Error ( ctx , "number of change events is less than window size, analysis skipped" , "num_changes" , numChanges , "window_size" , windowSize )
return
}
var eventTriggered bool
breachTriggers := map [ string ] bool {
constants . TIBreachTriggerType : false ,
}
for i := 0 ; i <= numChanges - windowSize ; i ++ {
windowBreachTypes := changeBreachTypes [ i : i + windowSize ]
firstBreachType := windowBreachTypes [ 0 ]
if firstBreachType == "" {
continue
}
allMatch := true
for j := 1 ; j < windowSize ; j ++ {
if windowBreachTypes [ j ] != firstBreachType {
allMatch = false
break
}
}
if allMatch {
if ! breachTriggers [ firstBreachType ] {
finalValueIndex := i + windowSize
logger . Warn ( ctx , "event triggered by sliding window" , "breach_type" , firstBreachType , "value" , realTimeValues [ finalValueIndex ] )
breachTriggers [ firstBreachType ] = true
eventTriggered = true
}
}
}
if eventTriggered {
2026-02-26 16:48:12 +08:00
command , mainBody := genTIEventCommandAndMainBody ( conf . Action )
if command == "" || mainBody == "" {
logger . Error ( ctx , "generate telemetry evnet command or content failed" , "action" , conf . Action , "command" , command , "main_body" , mainBody )
2025-11-18 16:46:47 +08:00
return
}
2026-02-26 16:48:12 +08:00
event . TriggerEventAction ( ctx , command , mainBody )
2025-11-18 16:46:47 +08:00
return
}
}
2026-02-26 16:48:12 +08:00
func genTIEventCommandAndMainBody ( action map [ string ] any ) ( command string , mainBody string ) {
2025-11-18 16:46:47 +08:00
cmdValue , exist := action [ "command" ]
if ! exist {
return "" , ""
}
commandStr , ok := cmdValue . ( string )
if ! ok {
return "" , ""
2025-11-17 16:39:26 +08:00
}
2025-11-18 16:46:47 +08:00
command = commandStr
paramsValue , exist := action [ "parametes" ]
if ! exist {
return command , ""
}
parameterSlice , ok := paramsValue . ( [ ] string )
if ! ok {
return command , ""
}
var builder strings . Builder
for i , parameter := range parameterSlice {
if i > 0 {
builder . WriteString ( "," )
}
builder . WriteString ( parameter )
}
return command , builder . String ( )
2025-11-17 16:39:26 +08:00
}