Compare commits
18 Commits
develop
...
feature/ba
| Author | SHA1 | Date |
|---|---|---|
|
|
4b52e5f3c6 | |
|
|
f6bb3fb985 | |
|
|
2ececc38d9 | |
|
|
6c9da6fcd4 | |
|
|
56b9999d6b | |
|
|
1c385ee60d | |
|
|
6618209bcc | |
|
|
581153ed8d | |
|
|
f45b7d5fa4 | |
|
|
9be984899c | |
|
|
35cb969a54 | |
|
|
02e0c9c31a | |
|
|
2126aa7b06 | |
|
|
3374eec047 | |
|
|
3ff29cc072 | |
|
|
617d21500e | |
|
|
1a1727adab | |
|
|
fd2b202037 |
|
|
@ -27,3 +27,4 @@ go.work
|
|||
/log/
|
||||
# Shield config files in the configs folder
|
||||
/configs/**/*.yaml
|
||||
/configs/**/*.pem
|
||||
|
|
|
|||
|
|
@ -16,6 +16,12 @@ var (
|
|||
|
||||
// ErrFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
|
||||
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
|
||||
// ErrSubTargetRepeat define variable to indicates subscription target already exist in list
|
||||
ErrSubTargetRepeat = newError(40005, "subscription target already exist in list")
|
||||
// ErrSubTargetNotFound define variable to indicates can not find measurement by subscription target
|
||||
ErrSubTargetNotFound = newError(40006, "found measuremnet by subscription target failed")
|
||||
// ErrCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
|
||||
ErrCancelSubTargetMissing = newError(40007, "cancel a not exist subscription target")
|
||||
|
||||
// ErrDBQueryFailed define variable to represents a generic failure during a PostgreSQL SELECT or SCAN operation.
|
||||
ErrDBQueryFailed = newError(50001, "query postgres database data failed")
|
||||
|
|
|
|||
|
|
@ -44,10 +44,11 @@ var baseCurrentFunc = func(archorValue float64, args ...float64) float64 {
|
|||
// SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data
|
||||
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float64, args ...float64) float64, []float64) {
|
||||
if componentType == constants.DemoType {
|
||||
if anchorName == "voltage" {
|
||||
switch anchorName {
|
||||
case "voltage":
|
||||
resistance := componentData["resistance"].(float64)
|
||||
return baseVoltageFunc, []float64{resistance}
|
||||
} else if anchorName == "current" {
|
||||
case "current":
|
||||
resistance := componentData["resistance"].(float64)
|
||||
return baseCurrentFunc, []float64{resistance}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,21 @@ type ServiceConfig struct {
|
|||
ServiceAddr string `mapstructure:"service_addr"`
|
||||
ServiceName string `mapstructure:"service_name"`
|
||||
SecretKey string `mapstructure:"secret_key"`
|
||||
DeployEnv string `mapstructure:"deploy_env"`
|
||||
}
|
||||
|
||||
// RabbitMQConfig define config struct of RabbitMQ config
|
||||
type RabbitMQConfig struct {
|
||||
CACertPath string `mapstructure:"ca_cert_path"`
|
||||
ClientKeyPath string `mapstructure:"client_key_path"`
|
||||
ClientKeyPassword string `mapstructure:"client_key_password"`
|
||||
ClientCertPath string `mapstructure:"client_cert_path"`
|
||||
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
|
||||
ServerName string `mapstructure:"server_name"`
|
||||
User string `mapstructure:"user"`
|
||||
Password string `mapstructure:"password"`
|
||||
Host string `mapstructure:"host"`
|
||||
Port int `mapstructure:"port"`
|
||||
}
|
||||
|
||||
// KafkaConfig define config struct of kafka config
|
||||
|
|
@ -57,7 +72,9 @@ type RedisConfig struct {
|
|||
Password string `mapstructure:"password"`
|
||||
DB int `mapstructure:"db"`
|
||||
PoolSize int `mapstructure:"poolsize"`
|
||||
Timeout int `mapstructure:"timeout"`
|
||||
DialTimeout int `mapstructure:"dial_timeout"`
|
||||
ReadTimeout int `mapstructure:"read_timeout"`
|
||||
WriteTimeout int `mapstructure:"write_timeout"`
|
||||
}
|
||||
|
||||
// AntsConfig define config struct of ants pool config
|
||||
|
|
@ -79,6 +96,7 @@ type ModelRTConfig struct {
|
|||
BaseConfig `mapstructure:"base"`
|
||||
ServiceConfig `mapstructure:"service"`
|
||||
PostgresConfig `mapstructure:"postgres"`
|
||||
RabbitMQConfig `mapstructure:"rabbitmq"`
|
||||
KafkaConfig `mapstructure:"kafka"`
|
||||
LoggerConfig `mapstructure:"logger"`
|
||||
AntsConfig `mapstructure:"ants"`
|
||||
|
|
|
|||
|
|
@ -1,17 +0,0 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// CodeSuccess define constant to indicates that the API was successfully processed
|
||||
CodeSuccess = 20000
|
||||
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
|
||||
CodeInvalidParamFailed = 40001
|
||||
// CodeDBQueryFailed define constant to indicates database query operation failed
|
||||
CodeDBQueryFailed = 50001
|
||||
// CodeDBUpdateailed define constant to indicates database update operation failed
|
||||
CodeDBUpdateailed = 50002
|
||||
// CodeRedisQueryFailed define constant to indicates redis query operation failed
|
||||
CodeRedisQueryFailed = 60001
|
||||
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
|
||||
CodeRedisUpdateFailed = 60002
|
||||
)
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// CodeSuccess define constant to indicates that the API was successfully processed
|
||||
CodeSuccess = 20000
|
||||
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
|
||||
CodeInvalidParamFailed = 40001
|
||||
// CodeFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
|
||||
CodeFoundTargetFailed = 40004
|
||||
// CodeSubTargetRepeat define variable to indicates subscription target already exist in list
|
||||
CodeSubTargetRepeat = 40005
|
||||
// CodeSubTargetNotFound define variable to indicates can not find measurement by subscription target
|
||||
CodeSubTargetNotFound = 40006
|
||||
// CodeCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
|
||||
CodeCancelSubTargetMissing = 40007
|
||||
// CodeUpdateSubTargetMissing define variable to indicates update a not exist subscription target
|
||||
CodeUpdateSubTargetMissing = 40008
|
||||
// CodeAppendSubTargetMissing define variable to indicates append a not exist subscription target
|
||||
CodeAppendSubTargetMissing = 40009
|
||||
// CodeUnsupportSubOperation define variable to indicates append a not exist subscription target
|
||||
CodeUnsupportSubOperation = 40010
|
||||
// CodeDBQueryFailed define constant to indicates database query operation failed
|
||||
CodeDBQueryFailed = 50001
|
||||
// CodeDBUpdateailed define constant to indicates database update operation failed
|
||||
CodeDBUpdateailed = 50002
|
||||
// CodeRedisQueryFailed define constant to indicates redis query operation failed
|
||||
CodeRedisQueryFailed = 60001
|
||||
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
|
||||
CodeRedisUpdateFailed = 60002
|
||||
)
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// DevelopmentDeployMode define development operator environment for modelRT project
|
||||
DevelopmentDeployMode = "development"
|
||||
// DebugDeployMode define debug operator environment for modelRT project
|
||||
DebugDeployMode = "debug"
|
||||
// ProductionDeployMode define production operator environment for modelRT project
|
||||
ProductionDeployMode = "production"
|
||||
)
|
||||
|
|
@ -1,31 +1,83 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
// EvenvtType define event type
|
||||
type EvenvtType int
|
||||
|
||||
const (
|
||||
// TIBreachTriggerType define out of bounds type constant
|
||||
TIBreachTriggerType = "trigger"
|
||||
// EventGeneralHard define gereral hard event type
|
||||
EventGeneralHard EvenvtType = iota
|
||||
// EventGeneralPlatformSoft define gereral platform soft event type
|
||||
EventGeneralPlatformSoft
|
||||
// EventGeneralApplicationSoft define gereral application soft event type
|
||||
EventGeneralApplicationSoft
|
||||
// EventWarnHard define warn hard event type
|
||||
EventWarnHard
|
||||
// EventWarnPlatformSoft define warn platform soft event type
|
||||
EventWarnPlatformSoft
|
||||
// EventWarnApplicationSoft define warn application soft event type
|
||||
EventWarnApplicationSoft
|
||||
// EventCriticalHard define critical hard event type
|
||||
EventCriticalHard
|
||||
// EventCriticalPlatformSoft define critical platform soft event type
|
||||
EventCriticalPlatformSoft
|
||||
// EventCriticalApplicationSoft define critical application soft event type
|
||||
EventCriticalApplicationSoft
|
||||
)
|
||||
|
||||
// IsGeneral define fucn to check event type is general
|
||||
func IsGeneral(eventType EvenvtType) bool {
|
||||
return eventType < 3
|
||||
}
|
||||
|
||||
// IsWarning define fucn to check event type is warn
|
||||
func IsWarning(eventType EvenvtType) bool {
|
||||
return eventType >= 3 && eventType <= 5
|
||||
}
|
||||
|
||||
// IsCritical define fucn to check event type is critical
|
||||
func IsCritical(eventType EvenvtType) bool {
|
||||
return eventType >= 6
|
||||
}
|
||||
|
||||
const (
|
||||
// EventFromStation define event from station type
|
||||
EventFromStation = "station"
|
||||
// EventFromPlatform define event from platform type
|
||||
EventFromPlatform = "platform"
|
||||
// EventFromOthers define event from others type
|
||||
EventFromOthers = "others"
|
||||
)
|
||||
|
||||
const (
|
||||
// TelemetryUpLimit define telemetry upper limit
|
||||
TelemetryUpLimit = "up"
|
||||
// TelemetryUpUpLimit define telemetry upper upper limit
|
||||
TelemetryUpUpLimit = "upup"
|
||||
|
||||
// TelemetryDownLimit define telemetry limit
|
||||
TelemetryDownLimit = "down"
|
||||
// TelemetryDownDownLimit define telemetry lower lower limit
|
||||
TelemetryDownDownLimit = "downdown"
|
||||
// EventStatusHappended define status for event record when event just happened, no data attached yet
|
||||
EventStatusHappended = iota
|
||||
// EventStatusDataAttached define status for event record when event just happened, data attached already
|
||||
EventStatusDataAttached
|
||||
// EventStatusReported define status for event record when event reported to CIM, no matter it's successful or failed
|
||||
EventStatusReported
|
||||
// EventStatusConfirmed define status for event record when event confirmed by CIM, no matter it's successful or failed
|
||||
EventStatusConfirmed
|
||||
// EventStatusPersisted define status for event record when event persisted in database, no matter it's successful or failed
|
||||
EventStatusPersisted
|
||||
// EventStatusClosed define status for event record when event closed, no matter it's successful or failed
|
||||
EventStatusClosed
|
||||
)
|
||||
|
||||
const (
|
||||
// TelesignalRaising define telesignal raising edge
|
||||
TelesignalRaising = "raising"
|
||||
// TelesignalFalling define telesignal falling edge
|
||||
TelesignalFalling = "falling"
|
||||
// EventExchangeName define exchange name for event alarm message
|
||||
EventExchangeName = "event-exchange"
|
||||
// EventDeadExchangeName define dead letter exchange name for event alarm message
|
||||
EventDeadExchangeName = "event-dead-letter-exchange"
|
||||
)
|
||||
|
||||
const (
|
||||
// MinBreachCount define min breach count of real time data
|
||||
MinBreachCount = 10
|
||||
// EventUpDownRoutingKey define routing key for up or down limit event alarm message
|
||||
EventUpDownRoutingKey = "event-up-down-routing-key"
|
||||
// EventUpDownDeadRoutingKey define dead letter routing key for up or down limit event alarm message
|
||||
EventUpDownDeadRoutingKey = "event-up-down-dead-letter-routing-key"
|
||||
// EventUpDownQueueName define queue name for up or down limit event alarm message
|
||||
EventUpDownQueueName = "event-up-down-queue"
|
||||
// EventUpDownDeadQueueName define dead letter queue name for event alarm message
|
||||
EventUpDownDeadQueueName = "event-dead-letter-queue"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -12,29 +12,6 @@ const (
|
|||
SubUpdateAction string = "update"
|
||||
)
|
||||
|
||||
// 定义状态常量
|
||||
// TODO 从4位格式修改为5位格式
|
||||
const (
|
||||
// SubSuccessCode define subscription success code
|
||||
SubSuccessCode = "1001"
|
||||
// SubFailedCode define subscription failed code
|
||||
SubFailedCode = "1002"
|
||||
// RTDSuccessCode define real time data return success code
|
||||
RTDSuccessCode = "1003"
|
||||
// RTDFailedCode define real time data return failed code
|
||||
RTDFailedCode = "1004"
|
||||
// CancelSubSuccessCode define cancel subscription success code
|
||||
CancelSubSuccessCode = "1005"
|
||||
// CancelSubFailedCode define cancel subscription failed code
|
||||
CancelSubFailedCode = "1006"
|
||||
// SubRepeatCode define subscription repeat code
|
||||
SubRepeatCode = "1007"
|
||||
// UpdateSubSuccessCode define update subscription success code
|
||||
UpdateSubSuccessCode = "1008"
|
||||
// UpdateSubFailedCode define update subscription failed code
|
||||
UpdateSubFailedCode = "1009"
|
||||
)
|
||||
|
||||
const (
|
||||
// SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine
|
||||
SysCtrlPrefix = "SYS_CTRL_"
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// TIBreachTriggerType define out of bounds type constant
|
||||
TIBreachTriggerType = "trigger"
|
||||
)
|
||||
|
||||
const (
|
||||
// TelemetryUpLimit define telemetry upper limit
|
||||
TelemetryUpLimit = "up"
|
||||
// TelemetryUpUpLimit define telemetry upper upper limit
|
||||
TelemetryUpUpLimit = "upup"
|
||||
|
||||
// TelemetryDownLimit define telemetry limit
|
||||
TelemetryDownLimit = "down"
|
||||
// TelemetryDownDownLimit define telemetry lower lower limit
|
||||
TelemetryDownDownLimit = "downdown"
|
||||
)
|
||||
|
||||
const (
|
||||
// TelesignalRaising define telesignal raising edge
|
||||
TelesignalRaising = "raising"
|
||||
// TelesignalFalling define telesignal falling edge
|
||||
TelesignalFalling = "falling"
|
||||
)
|
||||
|
||||
const (
|
||||
// MinBreachCount define min breach count of real time data
|
||||
MinBreachCount = 10
|
||||
)
|
||||
|
|
@ -53,7 +53,8 @@ func FillingLongTokenModel(ctx context.Context, tx *gorm.DB, identModel *model.L
|
|||
func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken string) (model.IndentityTokenModelInterface, error) {
|
||||
identSlice := strings.Split(identToken, ".")
|
||||
identSliceLen := len(identSlice)
|
||||
if identSliceLen == 4 {
|
||||
switch identSliceLen {
|
||||
case 4:
|
||||
// token1.token2.token3.token4.token7
|
||||
shortIndentModel := &model.ShortIdentityTokenModel{
|
||||
GridTag: identSlice[0],
|
||||
|
|
@ -67,7 +68,7 @@ func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken strin
|
|||
return nil, err
|
||||
}
|
||||
return shortIndentModel, nil
|
||||
} else if identSliceLen == 7 {
|
||||
case 7:
|
||||
// token1.token2.token3.token4.token5.token6.token7
|
||||
longIndentModel := &model.LongIdentityTokenModel{
|
||||
GridTag: identSlice[0],
|
||||
|
|
|
|||
|
|
@ -19,7 +19,8 @@ func ParseAttrToken(ctx context.Context, tx *gorm.DB, attrToken, clientToken str
|
|||
|
||||
attrSlice := strings.Split(attrToken, ".")
|
||||
attrLen := len(attrSlice)
|
||||
if attrLen == 4 {
|
||||
switch attrLen {
|
||||
case 4:
|
||||
short := &model.ShortAttrInfo{
|
||||
AttrGroupName: attrSlice[2],
|
||||
AttrKey: attrSlice[3],
|
||||
|
|
@ -35,7 +36,7 @@ func ParseAttrToken(ctx context.Context, tx *gorm.DB, attrToken, clientToken str
|
|||
}
|
||||
short.AttrValue = attrValue
|
||||
return short, nil
|
||||
} else if attrLen == 7 {
|
||||
case 7:
|
||||
long := &model.LongAttrInfo{
|
||||
AttrGroupName: attrSlice[5],
|
||||
AttrKey: attrSlice[6],
|
||||
|
|
|
|||
|
|
@ -2,9 +2,7 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"modelRT/logger"
|
||||
|
||||
|
|
@ -15,29 +13,23 @@ import (
|
|||
var (
|
||||
postgresOnce sync.Once
|
||||
_globalPostgresClient *gorm.DB
|
||||
_globalPostgresMu sync.RWMutex
|
||||
)
|
||||
|
||||
// GetPostgresDBClient returns the global PostgresDB client.It's safe for concurrent use.
|
||||
func GetPostgresDBClient() *gorm.DB {
|
||||
_globalPostgresMu.RLock()
|
||||
client := _globalPostgresClient
|
||||
_globalPostgresMu.RUnlock()
|
||||
return client
|
||||
return _globalPostgresClient
|
||||
}
|
||||
|
||||
// InitPostgresDBInstance return instance of PostgresDB client
|
||||
func InitPostgresDBInstance(ctx context.Context, PostgresDBURI string) *gorm.DB {
|
||||
func InitPostgresDBInstance(PostgresDBURI string) *gorm.DB {
|
||||
postgresOnce.Do(func() {
|
||||
_globalPostgresClient = initPostgresDBClient(ctx, PostgresDBURI)
|
||||
_globalPostgresClient = initPostgresDBClient(PostgresDBURI)
|
||||
})
|
||||
return _globalPostgresClient
|
||||
}
|
||||
|
||||
// initPostgresDBClient return successfully initialized PostgresDB client
|
||||
func initPostgresDBClient(ctx context.Context, PostgresDBURI string) *gorm.DB {
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
func initPostgresDBClient(PostgresDBURI string) *gorm.DB {
|
||||
db, err := gorm.Open(postgres.Open(PostgresDBURI), &gorm.Config{Logger: logger.NewGormLogger()})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
|||
|
|
@ -19,8 +19,8 @@ func NewRedisClient() *RedisClient {
|
|||
}
|
||||
}
|
||||
|
||||
// QueryByZRangeByLex define func to query real time data from redis zset
|
||||
func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64) ([]redis.Z, error) {
|
||||
// QueryByZRange define func to query real time data from redis zset
|
||||
func (rc *RedisClient) QueryByZRange(ctx context.Context, key string, size int64) ([]redis.Z, error) {
|
||||
client := rc.Client
|
||||
args := redis.ZRangeArgs{
|
||||
Key: key,
|
||||
|
|
|
|||
|
|
@ -16,13 +16,15 @@ var (
|
|||
)
|
||||
|
||||
// initClient define func of return successfully initialized redis client
|
||||
func initClient(rCfg config.RedisConfig) *redis.Client {
|
||||
func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
||||
client, err := util.NewRedisClient(
|
||||
rCfg.Addr,
|
||||
util.WithPassword(rCfg.Password),
|
||||
util.WithPassword(rCfg.Password, deployEnv),
|
||||
util.WithDB(rCfg.DB),
|
||||
util.WithPoolSize(rCfg.PoolSize),
|
||||
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
|
||||
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
|
||||
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
|
||||
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
@ -31,9 +33,9 @@ func initClient(rCfg config.RedisConfig) *redis.Client {
|
|||
}
|
||||
|
||||
// InitRedisClientInstance define func of return instance of redis client
|
||||
func InitRedisClientInstance(rCfg config.RedisConfig) *redis.Client {
|
||||
func InitRedisClientInstance(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
||||
once.Do(func() {
|
||||
_globalStorageClient = initClient(rCfg)
|
||||
_globalStorageClient = initClient(rCfg, deployEnv)
|
||||
})
|
||||
return _globalStorageClient
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,13 +16,15 @@ var (
|
|||
)
|
||||
|
||||
// initClient define func of return successfully initialized redis client
|
||||
func initClient(rCfg config.RedisConfig) *redis.Client {
|
||||
func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
||||
client, err := util.NewRedisClient(
|
||||
rCfg.Addr,
|
||||
util.WithPassword(rCfg.Password),
|
||||
util.WithPassword(rCfg.Password, deployEnv),
|
||||
util.WithDB(rCfg.DB),
|
||||
util.WithPoolSize(rCfg.PoolSize),
|
||||
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
|
||||
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
|
||||
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
|
||||
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
@ -31,9 +33,9 @@ func initClient(rCfg config.RedisConfig) *redis.Client {
|
|||
}
|
||||
|
||||
// InitClientInstance define func of return instance of redis client
|
||||
func InitClientInstance(rCfg config.RedisConfig) *redis.Client {
|
||||
func InitClientInstance(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
||||
once.Do(func() {
|
||||
_globalLockerClient = initClient(rCfg)
|
||||
_globalLockerClient = initClient(rCfg, deployEnv)
|
||||
})
|
||||
return _globalLockerClient
|
||||
}
|
||||
|
|
|
|||
4
go.mod
4
go.mod
|
|
@ -13,14 +13,15 @@ require (
|
|||
github.com/json-iterator/go v1.1.12
|
||||
github.com/natefinch/lumberjack v2.0.0+incompatible
|
||||
github.com/panjf2000/ants/v2 v2.10.0
|
||||
github.com/rabbitmq/amqp091-go v1.10.0
|
||||
github.com/redis/go-redis/v9 v9.7.3
|
||||
github.com/spf13/viper v1.19.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/swaggo/files v1.0.1
|
||||
github.com/swaggo/gin-swagger v1.6.0
|
||||
github.com/swaggo/swag v1.16.4
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/sys v0.28.0
|
||||
gorm.io/driver/mysql v1.5.7
|
||||
gorm.io/driver/postgres v1.5.9
|
||||
gorm.io/gorm v1.25.12
|
||||
|
|
@ -81,6 +82,7 @@ require (
|
|||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
|
||||
golang.org/x/net v0.32.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
golang.org/x/sys v0.28.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
golang.org/x/tools v0.28.0 // indirect
|
||||
google.golang.org/protobuf v1.35.2 // indirect
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -121,6 +121,8 @@ github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xl
|
|||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
|
||||
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
|
||||
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
|
||||
|
|
@ -162,6 +164,8 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
|
|||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
|
||||
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
|
|
|
|||
|
|
@ -5,10 +5,10 @@ import (
|
|||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"modelRT/alert"
|
||||
"modelRT/constants"
|
||||
"modelRT/logger"
|
||||
"modelRT/network"
|
||||
"modelRT/real-time-data/alert"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -30,3 +30,14 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) {
|
|||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func renderWSRespFailure(c *gin.Context, code int, msg string, payload any) {
|
||||
resp := network.WSResponse{
|
||||
Code: code,
|
||||
Msg: msg,
|
||||
}
|
||||
if payload != nil {
|
||||
resp.Payload = payload
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,10 +6,10 @@ import (
|
|||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"modelRT/alert"
|
||||
"modelRT/constants"
|
||||
"modelRT/logger"
|
||||
"modelRT/network"
|
||||
"modelRT/real-time-data/alert"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -39,20 +39,14 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
|||
if clientID == "" {
|
||||
err := fmt.Errorf("clientID is missing from the path")
|
||||
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
renderWSRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
logger.Error(c, "upgrade http protocol to websocket protocol failed", "error", err)
|
||||
renderWSRespFailure(c, constants.RespCodeServerError, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
|
@ -60,9 +54,18 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
|||
ctx, cancel := context.WithCancel(c.Request.Context())
|
||||
defer cancel()
|
||||
|
||||
conn.SetCloseHandler(func(code int, text string) error {
|
||||
logger.Info(c.Request.Context(), "websocket processor shutdown trigger",
|
||||
"clientID", clientID, "code", code, "reason", text)
|
||||
|
||||
// call cancel to notify other goroutines to stop working
|
||||
cancel()
|
||||
return nil
|
||||
})
|
||||
|
||||
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
|
||||
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
|
||||
sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize)
|
||||
sendChan := make(chan network.WSResponse, constants.SendChanBufferSize)
|
||||
|
||||
go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan)
|
||||
go readClientMessages(ctx, conn, clientID, cancel)
|
||||
|
|
@ -79,52 +82,33 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
|||
select {
|
||||
case targetData, ok := <-fanInChan:
|
||||
if !ok {
|
||||
logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID)
|
||||
sendChan <- network.WSResponse{
|
||||
Code: constants.RespCodeServerError,
|
||||
Msg: "abnormal shutdown of data fan-in channel",
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
buffer = append(buffer, targetData)
|
||||
|
||||
if len(buffer) >= bufferMaxSize {
|
||||
// buffer is full, send immediately
|
||||
select {
|
||||
case sendChan <- buffer:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID)
|
||||
}
|
||||
|
||||
// reset buffer
|
||||
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
|
||||
// reset the ticker to prevent it from triggering immediately after the ticker is sent
|
||||
flushBuffer(ctx, &buffer, sendChan, clientID, "buffer_full")
|
||||
ticker.Reset(sendMaxInterval)
|
||||
}
|
||||
case <-ticker.C:
|
||||
if len(buffer) > 0 {
|
||||
// when the ticker is triggered, all data in the send buffer is sent
|
||||
select {
|
||||
case sendChan <- buffer:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID)
|
||||
}
|
||||
|
||||
// reset buffer
|
||||
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
|
||||
flushBuffer(ctx, &buffer, sendChan, clientID, "ticker_timeout")
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// send the last remaining data
|
||||
// last refresh before exiting
|
||||
if len(buffer) > 0 {
|
||||
select {
|
||||
case sendChan <- buffer:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID)
|
||||
flushBuffer(ctx, &buffer, sendChan, clientID, "shutdown")
|
||||
}
|
||||
}
|
||||
logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
|
||||
// readClientMessages define func to responsible for continuously listening for messages sent by clients (such as Ping/Pong, Close Frame, or control commands)
|
||||
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
|
||||
// conn.SetReadLimit(512)
|
||||
for {
|
||||
|
|
@ -149,54 +133,47 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri
|
|||
}
|
||||
}
|
||||
|
||||
// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client
|
||||
func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error {
|
||||
if len(targetsData) == 0 {
|
||||
return nil
|
||||
func flushBuffer(ctx context.Context, buffer *[]network.RealTimePullTarget, sendChan chan<- network.WSResponse, clientID string, reason string) {
|
||||
if len(*buffer) == 0 {
|
||||
return
|
||||
}
|
||||
response := network.SuccessResponse{
|
||||
Code: 200,
|
||||
Msg: "success",
|
||||
|
||||
resp := network.WSResponse{
|
||||
Code: constants.RespCodeSuccess,
|
||||
Msg: "process completed",
|
||||
Payload: network.RealTimePullPayload{
|
||||
Targets: targetsData,
|
||||
Targets: *buffer,
|
||||
},
|
||||
}
|
||||
return conn.WriteJSON(response)
|
||||
|
||||
select {
|
||||
case sendChan <- resp:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan blocked, dropping data batch", "client_id", clientID, "reason", reason)
|
||||
}
|
||||
*buffer = make([]network.RealTimePullTarget, 0, constants.SendMaxBatchSize)
|
||||
}
|
||||
|
||||
// sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client
|
||||
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
|
||||
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
|
||||
for targetsData := range sendChan {
|
||||
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
|
||||
if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved {
|
||||
err := conn.WriteJSON(map[string]any{
|
||||
"code": 2101,
|
||||
"msg": "all targets removed in given client_id",
|
||||
"payload": map[string]int{
|
||||
"active_targets_count": 0,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err)
|
||||
cancel()
|
||||
}
|
||||
continue
|
||||
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan network.WSResponse, cancel context.CancelFunc) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logger.Error(ctx, "sendDataStream recovered from panic", "err", r)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil {
|
||||
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err)
|
||||
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
|
||||
for resp := range sendChan {
|
||||
if err := conn.WriteJSON(resp); err != nil {
|
||||
logger.Error(ctx, "websocket write failed", "client_id", clientID, "error", err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID)
|
||||
}
|
||||
|
||||
// processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target
|
||||
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) {
|
||||
// ensure the fanInChan will not leak
|
||||
defer close(fanInChan)
|
||||
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- network.WSResponse) {
|
||||
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
|
||||
stopChanMap := make(map[string]chan struct{})
|
||||
s.globalMutex.RLock()
|
||||
|
|
@ -383,7 +360,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
|
|||
}
|
||||
|
||||
// removeTargets define func to stops running polling goroutines for targets that were removed
|
||||
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) {
|
||||
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- network.WSResponse) {
|
||||
for _, target := range removeTargets {
|
||||
stopChan, exists := stopChanMap[target]
|
||||
if !exists {
|
||||
|
|
@ -402,17 +379,18 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re
|
|||
}
|
||||
}
|
||||
|
||||
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) {
|
||||
specialTarget := network.RealTimePullTarget{
|
||||
ID: constants.SysCtrlAllRemoved,
|
||||
Datas: []network.RealTimePullData{},
|
||||
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- network.WSResponse) {
|
||||
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
|
||||
resp := network.WSResponse{
|
||||
Code: constants.RespCodeSuccessWithNoSub,
|
||||
Msg: "all targets removed",
|
||||
Payload: map[string]int{"active_targets_count": 0},
|
||||
}
|
||||
|
||||
select {
|
||||
case sendChan <- []network.RealTimePullTarget{specialTarget}:
|
||||
logger.Info(ctx, "sent 2101 status request to sendChan")
|
||||
case sendChan <- resp:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan is full, skipping 2101 status message")
|
||||
logger.Warn(ctx, "sendChan is full, skipping 2101 status")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -423,7 +401,6 @@ func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) {
|
|||
close(stopChan)
|
||||
}
|
||||
clear(stopChanMap)
|
||||
return
|
||||
}
|
||||
|
||||
// redisPollingConfig define struct for param which query real time data from redis
|
||||
|
|
@ -463,7 +440,7 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig,
|
|||
}
|
||||
|
||||
func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) {
|
||||
members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize)
|
||||
members, err := client.QueryByZRange(ctx, config.queryKey, config.dataSize)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -168,7 +168,6 @@ func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, tran
|
|||
}
|
||||
transportChannel <- subPoss
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// messageTypeToString define func of auxiliary to convert message type to string
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"modelRT/constants"
|
||||
|
|
@ -33,42 +32,42 @@ func init() {
|
|||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param request body network.RealTimeSubRequest true "量测节点实时数据订阅"
|
||||
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||
// @Success 2000 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||
//
|
||||
// @Example 200 {
|
||||
// "code": 200,
|
||||
// "msg": "success",
|
||||
// @Example 2000 {
|
||||
// "code": 2000,
|
||||
// "msg": "process completed",
|
||||
// "payload": {
|
||||
// "targets": [
|
||||
// {
|
||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_C_rms",
|
||||
// "code": "1001",
|
||||
// "code": "20000",
|
||||
// "msg": "subscription success"
|
||||
// },
|
||||
// {
|
||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
|
||||
// "code": "1002",
|
||||
// "code": "20000",
|
||||
// "msg": "subscription failed"
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||
// @Failure 3000 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||
//
|
||||
// @Example 400 {
|
||||
// "code": 400,
|
||||
// "msg": "failed to get recommend data from redis",
|
||||
// @Example 3000 {
|
||||
// "code": 3000,
|
||||
// "msg": "process completed with partial failures",
|
||||
// "payload": {
|
||||
// "targets": [
|
||||
// {
|
||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_A_rms",
|
||||
// "code": "1002",
|
||||
// "code": "40005",
|
||||
// "msg": "subscription failed"
|
||||
// },
|
||||
// {
|
||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
|
||||
// "code": "1002",
|
||||
// "code": "50001",
|
||||
// "msg": "subscription failed"
|
||||
// }
|
||||
// ]
|
||||
|
|
@ -83,10 +82,7 @@ func RealTimeSubHandler(c *gin.Context) {
|
|||
|
||||
if err := c.ShouldBindJSON(&request); err != nil {
|
||||
logger.Error(c, "failed to unmarshal real time query request", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -95,10 +91,7 @@ func RealTimeSubHandler(c *gin.Context) {
|
|||
id, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
logger.Error(c, "failed to generate client id", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
clientID = id.String()
|
||||
|
|
@ -123,110 +116,74 @@ func RealTimeSubHandler(c *gin.Context) {
|
|||
results, err := globalSubState.CreateConfig(c, tx, clientID, request.Measurements)
|
||||
if err != nil {
|
||||
logger.Error(c, "create real time data subscription config failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
Payload: network.RealTimeSubPayload{
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
case constants.SubStopAction:
|
||||
results, err := globalSubState.RemoveTargets(c, clientID, request.Measurements)
|
||||
if err != nil {
|
||||
logger.Error(c, "remove target to real time data subscription config failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
Payload: network.RealTimeSubPayload{
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
case constants.SubAppendAction:
|
||||
results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements)
|
||||
if err != nil {
|
||||
logger.Error(c, "append target to real time data subscription config failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
Payload: network.RealTimeSubPayload{
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
case constants.SubUpdateAction:
|
||||
results, err := globalSubState.UpdateTargets(c, tx, clientID, request.Measurements)
|
||||
if err != nil {
|
||||
logger.Error(c, "update target to real time data subscription config failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
Payload: network.RealTimeSubPayload{
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
default:
|
||||
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action)
|
||||
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
||||
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
||||
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, constants.CodeUnsupportSubOperation, err)
|
||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
|
@ -283,12 +240,12 @@ func processAndValidateTargetsForStart(ctx context.Context, tx *gorm.DB, measure
|
|||
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
|
||||
targetResult.Code = constants.SubFailedCode
|
||||
targetResult.Code = constants.CodeFoundTargetFailed
|
||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
continue
|
||||
}
|
||||
targetResult.Code = constants.SubSuccessCode
|
||||
targetResult.Code = constants.CodeSuccess
|
||||
targetResult.Msg = constants.SubSuccessMsg
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
successfulTargets = append(successfulTargets, target)
|
||||
|
|
@ -327,7 +284,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
|
|||
if _, exist := config.targetContext[target]; !exist {
|
||||
err := fmt.Errorf("target %s does not exists in subscription list", target)
|
||||
logger.Error(ctx, "update target does not exist in subscription list", "error", err, "target", target)
|
||||
targetResult.Code = constants.UpdateSubFailedCode
|
||||
targetResult.Code = constants.CodeUpdateSubTargetMissing
|
||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
continue
|
||||
|
|
@ -336,13 +293,13 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
|
|||
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
|
||||
targetResult.Code = constants.UpdateSubFailedCode
|
||||
targetResult.Code = constants.CodeDBQueryFailed
|
||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
continue
|
||||
}
|
||||
|
||||
targetResult.Code = constants.UpdateSubSuccessCode
|
||||
targetResult.Code = constants.CodeSuccess
|
||||
targetResult.Msg = constants.UpdateSubSuccessMsg
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
successfulTargets = append(successfulTargets, target)
|
||||
|
|
@ -473,7 +430,7 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI
|
|||
if !exist {
|
||||
err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID)
|
||||
logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err)
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeAppendSubTargetMissing, err), err
|
||||
}
|
||||
|
||||
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount)
|
||||
|
|
@ -507,7 +464,7 @@ func filterAndDeduplicateRepeatTargets(resultsSlice []network.TargetResult, idsS
|
|||
|
||||
for index := range resultsSlice {
|
||||
if _, isTarget := set[resultsSlice[index].ID]; isTarget {
|
||||
resultsSlice[index].Code = constants.SubRepeatCode
|
||||
resultsSlice[index].Code = constants.CodeSubTargetRepeat
|
||||
resultsSlice[index].Msg = constants.SubRepeatMsg
|
||||
}
|
||||
}
|
||||
|
|
@ -575,7 +532,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
s.globalMutex.RUnlock()
|
||||
err := fmt.Errorf("clientID %s not found", clientID)
|
||||
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeCancelSubTargetMissing, err), err
|
||||
}
|
||||
s.globalMutex.RUnlock()
|
||||
|
||||
|
|
@ -595,7 +552,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
for _, target := range measTargets {
|
||||
targetResult := network.TargetResult{
|
||||
ID: target,
|
||||
Code: constants.CancelSubFailedCode,
|
||||
Code: constants.CodeCancelSubTargetMissing,
|
||||
Msg: constants.CancelSubFailedMsg,
|
||||
}
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
|
|
@ -616,7 +573,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
transportTargets.Targets = append(transportTargets.Targets, existingTarget)
|
||||
targetResult := network.TargetResult{
|
||||
ID: existingTarget,
|
||||
Code: constants.CancelSubSuccessCode,
|
||||
Code: constants.CodeSuccess,
|
||||
Msg: constants.CancelSubSuccessMsg,
|
||||
}
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
|
|
@ -639,7 +596,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
for target := range targetsToRemoveMap {
|
||||
targetResult := network.TargetResult{
|
||||
ID: target,
|
||||
Code: constants.CancelSubFailedCode,
|
||||
Code: constants.CodeCancelSubTargetMissing,
|
||||
Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()),
|
||||
}
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
|
|
@ -663,17 +620,15 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
// UpdateTargets define function to update targets in SharedSubState
|
||||
func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) {
|
||||
requestTargetsCount := processRealTimeRequestCount(measurements)
|
||||
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
|
||||
|
||||
s.globalMutex.RLock()
|
||||
config, exist := s.subMap[clientID]
|
||||
s.globalMutex.RUnlock()
|
||||
|
||||
if !exist {
|
||||
s.globalMutex.RUnlock()
|
||||
err := fmt.Errorf("clientID %s not found", clientID)
|
||||
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeUpdateSubTargetMissing, err), err
|
||||
}
|
||||
|
||||
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForUpdate(ctx, tx, config, measurements, requestTargetsCount)
|
||||
|
|
@ -722,13 +677,13 @@ func processRealTimeRequestCount(measurements []network.RealTimeMeasurementItem)
|
|||
return totalTargetsCount
|
||||
}
|
||||
|
||||
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult {
|
||||
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, businessCode int, err error) []network.TargetResult {
|
||||
targetProcessResults := make([]network.TargetResult, 0, targetCount)
|
||||
for _, measurementItem := range measurements {
|
||||
for _, target := range measurementItem.Targets {
|
||||
var targetResult network.TargetResult
|
||||
targetResult.ID = target
|
||||
targetResult.Code = constants.SubFailedCode
|
||||
targetResult.Code = businessCode
|
||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
}
|
||||
|
|
|
|||
34
main.go
34
main.go
|
|
@ -13,13 +13,15 @@ import (
|
|||
"path/filepath"
|
||||
"syscall"
|
||||
|
||||
"modelRT/alert"
|
||||
"modelRT/config"
|
||||
"modelRT/constants"
|
||||
"modelRT/database"
|
||||
"modelRT/diagram"
|
||||
"modelRT/logger"
|
||||
"modelRT/model"
|
||||
"modelRT/mq"
|
||||
"modelRT/pool"
|
||||
"modelRT/real-time-data/alert"
|
||||
"modelRT/router"
|
||||
"modelRT/util"
|
||||
|
||||
|
|
@ -98,14 +100,14 @@ func main() {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceConfig.ServiceName, modelRTConfig.ServiceConfig.SecretKey)
|
||||
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceName, modelRTConfig.SecretKey)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "generate client token failed", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// init postgresDBClient
|
||||
postgresDBClient = database.InitPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI)
|
||||
postgresDBClient = database.InitPostgresDBInstance(modelRTConfig.PostgresDBURI)
|
||||
|
||||
defer func() {
|
||||
sqlDB, err := postgresDBClient.DB()
|
||||
|
|
@ -127,13 +129,17 @@ func main() {
|
|||
defer parsePool.Release()
|
||||
|
||||
searchPool, err := util.NewRedigoPool(modelRTConfig.StorageRedisConfig)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "init redigo pool failed", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
defer searchPool.Close()
|
||||
model.InitAutocompleterWithPool(searchPool)
|
||||
|
||||
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig)
|
||||
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig, modelRTConfig.DeployEnv)
|
||||
defer storageClient.Close()
|
||||
|
||||
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig)
|
||||
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig, modelRTConfig.DeployEnv)
|
||||
defer lockerClient.Close()
|
||||
|
||||
// init anchor param ants pool
|
||||
|
|
@ -144,6 +150,11 @@ func main() {
|
|||
}
|
||||
defer anchorRealTimePool.Release()
|
||||
|
||||
// init rabbitmq connection
|
||||
mq.InitRabbitProxy(ctx, modelRTConfig.RabbitMQConfig)
|
||||
// async push event to rabbitMQ
|
||||
go mq.PushUpDownLimitEventToRabbitMQ(ctx, mq.MsgChan)
|
||||
|
||||
postgresDBClient.Transaction(func(tx *gorm.DB) error {
|
||||
// load circuit diagram from postgres
|
||||
// componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool)
|
||||
|
|
@ -193,7 +204,7 @@ func main() {
|
|||
logger.Error(ctx, "load topologic info from postgres failed", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
go realtimedata.StartRealTimeDataComputing(ctx, allMeasurement)
|
||||
go realtimedata.StartComputingRealTimeDataLimit(ctx, allMeasurement)
|
||||
|
||||
tree, err := database.QueryTopologicFromDB(ctx, tx)
|
||||
if err != nil {
|
||||
|
|
@ -204,8 +215,10 @@ func main() {
|
|||
return nil
|
||||
})
|
||||
|
||||
// use release mode in productio
|
||||
// gin.SetMode(gin.ReleaseMode)
|
||||
// use release mode in production
|
||||
if modelRTConfig.DeployEnv == constants.ProductionDeployMode {
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
}
|
||||
engine := gin.New()
|
||||
router.RegisterRoutes(engine, serviceToken)
|
||||
|
||||
|
|
@ -223,7 +236,7 @@ func main() {
|
|||
// }
|
||||
|
||||
server := http.Server{
|
||||
Addr: modelRTConfig.ServiceConfig.ServiceAddr,
|
||||
Addr: modelRTConfig.ServiceAddr,
|
||||
Handler: engine,
|
||||
}
|
||||
|
||||
|
|
@ -232,9 +245,12 @@ func main() {
|
|||
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-done
|
||||
logger.Info(ctx, "shutdown signal received, cleaning up...")
|
||||
if err := server.Shutdown(context.Background()); err != nil {
|
||||
logger.Error(ctx, "shutdown serverError", "err", err)
|
||||
}
|
||||
mq.CloseRabbitProxy()
|
||||
logger.Info(ctx, "resources cleaned up, exiting")
|
||||
}()
|
||||
|
||||
logger.Info(ctx, "starting ModelRT server")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,129 @@
|
|||
// Package mq provides read or write access to message queue services
|
||||
package mq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"modelRT/constants"
|
||||
"modelRT/logger"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
|
||||
var MsgChan chan []byte
|
||||
|
||||
func init() {
|
||||
MsgChan = make(chan []byte, 10000)
|
||||
}
|
||||
|
||||
func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
|
||||
var channel *amqp.Channel
|
||||
var err error
|
||||
|
||||
channel, err = GetConn().Channel()
|
||||
if err != nil {
|
||||
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
|
||||
}
|
||||
|
||||
err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "declare event dead letter exchange failed", "error", err)
|
||||
}
|
||||
|
||||
_, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "declare event dead letter queue failed", "error", err)
|
||||
}
|
||||
|
||||
err = channel.QueueBind(constants.EventUpDownDeadQueueName, constants.EventUpDownDeadRoutingKey, constants.EventDeadExchangeName, false, nil)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err)
|
||||
}
|
||||
|
||||
err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "declare event exchange failed", "error", err)
|
||||
}
|
||||
|
||||
args := amqp.Table{
|
||||
// messages that accumulate to the maximum number will be automatically transferred to the dead letter queue
|
||||
"x-max-length": int32(50),
|
||||
"x-dead-letter-exchange": constants.EventDeadExchangeName,
|
||||
"x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey,
|
||||
}
|
||||
_, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "declare event queue failed", "error", err)
|
||||
}
|
||||
|
||||
err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "bind event queue with routing key and exchange failed:", "error", err)
|
||||
}
|
||||
|
||||
if err := channel.Confirm(false); err != nil {
|
||||
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
|
||||
}
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ
|
||||
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan []byte) {
|
||||
channel, err := initUpDownLimitEventChannel(ctx)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
|
||||
return
|
||||
}
|
||||
// TODO 使用配置修改确认模式通道参数
|
||||
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100))
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case confirm, ok := <-confirms:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if !confirm.Ack {
|
||||
logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
|
||||
channel.Close()
|
||||
return
|
||||
case msg, ok := <-msgChan:
|
||||
if !ok {
|
||||
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
|
||||
channel.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// send event alarm message to rabbitMQ queue
|
||||
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
err = channel.PublishWithContext(pubCtx,
|
||||
constants.EventExchangeName, // exchange
|
||||
constants.EventUpDownRoutingKey, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: msg,
|
||||
})
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", msg, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,217 @@
|
|||
// Package mq define message queue operation functions
|
||||
package mq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"modelRT/config"
|
||||
"modelRT/logger"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/youmark/pkcs8"
|
||||
)
|
||||
|
||||
var (
|
||||
_globalRabbitMQProxy *RabbitMQProxy
|
||||
rabbitMQOnce sync.Once
|
||||
)
|
||||
|
||||
// RabbitMQProxy define stuct of rabbitMQ connection proxy
|
||||
type RabbitMQProxy struct {
|
||||
tlsConf *tls.Config
|
||||
conn *amqp.Connection
|
||||
cancel context.CancelFunc
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// rabbitMQCertConf define stuct of rabbitMQ connection certificates config
|
||||
type rabbitMQCertConf struct {
|
||||
serverName string
|
||||
insecureSkipVerify bool
|
||||
clientCert tls.Certificate
|
||||
caCertPool *x509.CertPool
|
||||
}
|
||||
|
||||
// GetConn define func to return the rabbitMQ connection
|
||||
func GetConn() *amqp.Connection {
|
||||
_globalRabbitMQProxy.mu.Lock()
|
||||
defer _globalRabbitMQProxy.mu.Unlock()
|
||||
return _globalRabbitMQProxy.conn
|
||||
}
|
||||
|
||||
// InitRabbitProxy return instance of rabbitMQ connection
|
||||
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
|
||||
amqpURI := generateRabbitMQURI(rCfg)
|
||||
tlsConf, err := initCertConf(rCfg)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "init rabbitMQ cert config failed", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
rabbitMQOnce.Do(func() {
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
conn := initRabbitMQ(ctx, amqpURI, tlsConf)
|
||||
_globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel}
|
||||
go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI)
|
||||
})
|
||||
return _globalRabbitMQProxy
|
||||
}
|
||||
|
||||
// initRabbitMQ return instance of rabbitMQ connection
|
||||
func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection {
|
||||
logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI)
|
||||
conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
|
||||
TLSClientConfig: tlsConf,
|
||||
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
|
||||
Heartbeat: 10 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(ctx, "init rabbitMQ connection failed", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) {
|
||||
for {
|
||||
closeChan := make(chan *amqp.Error)
|
||||
GetConn().NotifyClose(closeChan)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, "context cancelled, exiting handleReconnect")
|
||||
return
|
||||
case err, ok := <-closeChan:
|
||||
if !ok {
|
||||
logger.Info(ctx, "rabbitMQ notify channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect")
|
||||
return
|
||||
}
|
||||
|
||||
logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err)
|
||||
}
|
||||
|
||||
if !p.reconnect(ctx, rabbitMQURI) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool {
|
||||
for {
|
||||
logger.Info(ctx, "attempting to reconnect to rabbitMQ...")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-time.After(5 * time.Second):
|
||||
|
||||
}
|
||||
|
||||
newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
|
||||
TLSClientConfig: p.tlsConf,
|
||||
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
|
||||
Heartbeat: 10 * time.Second,
|
||||
})
|
||||
if err == nil {
|
||||
p.mu.Lock()
|
||||
p.conn = newConn
|
||||
p.mu.Unlock()
|
||||
logger.Info(ctx, "rabbitMQ reconnected successfully")
|
||||
return true
|
||||
}
|
||||
logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine
|
||||
func CloseRabbitProxy() {
|
||||
if _globalRabbitMQProxy != nil {
|
||||
_globalRabbitMQProxy.cancel()
|
||||
_globalRabbitMQProxy.mu.Lock()
|
||||
if _globalRabbitMQProxy.conn != nil {
|
||||
_globalRabbitMQProxy.conn.Close()
|
||||
}
|
||||
_globalRabbitMQProxy.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
|
||||
// TODO 考虑拆分用户名密码配置项,兼容不同认证方式
|
||||
// user := url.QueryEscape(rCfg.User)
|
||||
// password := url.QueryEscape(rCfg.Password)
|
||||
|
||||
// amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/",
|
||||
// user,
|
||||
// password,
|
||||
// rCfg.Host,
|
||||
// rCfg.Port,
|
||||
// )
|
||||
amqpURI := fmt.Sprintf("amqps://%s:%d/",
|
||||
rCfg.Host,
|
||||
rCfg.Port,
|
||||
)
|
||||
return amqpURI
|
||||
}
|
||||
|
||||
func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) {
|
||||
tlsConf := &tls.Config{
|
||||
InsecureSkipVerify: rCfg.InsecureSkipVerify,
|
||||
ServerName: rCfg.ServerName,
|
||||
}
|
||||
|
||||
caCert, err := os.ReadFile(rCfg.CACertPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read server ca file failed: %w", err)
|
||||
}
|
||||
caCertPool := x509.NewCertPool()
|
||||
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
|
||||
return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath)
|
||||
}
|
||||
tlsConf.RootCAs = caCertPool
|
||||
|
||||
certPEM, err := os.ReadFile(rCfg.ClientCertPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read client cert file failed: %w", err)
|
||||
}
|
||||
|
||||
keyData, err := os.ReadFile(rCfg.ClientKeyPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read private key file failed: %w", err)
|
||||
}
|
||||
|
||||
block, _ := pem.Decode(keyData)
|
||||
if block == nil {
|
||||
return nil, fmt.Errorf("failed to decode PEM block from private key")
|
||||
}
|
||||
|
||||
der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse password-protected private key failed: %w", err)
|
||||
}
|
||||
|
||||
privBytes, err := x509.MarshalPKCS8PrivateKey(der)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal private key failed: %w", err)
|
||||
}
|
||||
|
||||
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
|
||||
|
||||
clientCert, err := tls.X509KeyPair(certPEM, keyPEM)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create x509 key pair failed: %w", err)
|
||||
}
|
||||
|
||||
tlsConf.Certificates = []tls.Certificate{clientCert}
|
||||
return tlsConf, nil
|
||||
}
|
||||
|
|
@ -3,18 +3,25 @@ package network
|
|||
|
||||
// FailureResponse define struct of standard failure API response format
|
||||
type FailureResponse struct {
|
||||
Code int `json:"code" example:"500"`
|
||||
Msg string `json:"msg" example:"failed to get recommend data from redis"`
|
||||
Code int `json:"code" example:"3000"`
|
||||
Msg string `json:"msg" example:"process completed with partial failures"`
|
||||
Payload any `json:"payload" swaggertype:"object"`
|
||||
}
|
||||
|
||||
// SuccessResponse define struct of standard successful API response format
|
||||
type SuccessResponse struct {
|
||||
Code int `json:"code" example:"200"`
|
||||
Msg string `json:"msg" example:"success"`
|
||||
Code int `json:"code" example:"2000"`
|
||||
Msg string `json:"msg" example:"process completed"`
|
||||
Payload any `json:"payload" swaggertype:"object"`
|
||||
}
|
||||
|
||||
// WSResponse define struct of standard websocket API response format
|
||||
type WSResponse struct {
|
||||
Code int `json:"code" example:"2000"`
|
||||
Msg string `json:"msg" example:"process completed"`
|
||||
Payload any `json:"payload,omitempty" swaggertype:"object"`
|
||||
}
|
||||
|
||||
// MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response.
|
||||
type MeasurementRecommendPayload struct {
|
||||
Input string `json:"input" example:"transformfeeder1_220."`
|
||||
|
|
@ -26,7 +33,7 @@ type MeasurementRecommendPayload struct {
|
|||
// TargetResult define struct of target item in real time data subscription response payload
|
||||
type TargetResult struct {
|
||||
ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"`
|
||||
Code string `json:"code" example:"1001"`
|
||||
Code int `json:"code" example:"20000"`
|
||||
Msg string `json:"msg" example:"subscription success"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,11 +5,11 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"modelRT/alert"
|
||||
"modelRT/config"
|
||||
"modelRT/constants"
|
||||
"modelRT/diagram"
|
||||
"modelRT/logger"
|
||||
"modelRT/real-time-data/alert"
|
||||
|
||||
"github.com/panjf2000/ants/v2"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -26,6 +26,13 @@ type teEventThresholds struct {
|
|||
isFloatCause bool
|
||||
}
|
||||
|
||||
type teBreachTrigger struct {
|
||||
breachType string
|
||||
triggered bool
|
||||
triggeredValues []float64
|
||||
eventOpts []event.EventOption
|
||||
}
|
||||
|
||||
// parseTEThresholds define func to parse telemetry thresholds by casue map
|
||||
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
|
||||
t := teEventThresholds{}
|
||||
|
|
@ -84,60 +91,70 @@ func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeCo
|
|||
// analyzeTEDataLogic define func to processing telemetry data and event triggering
|
||||
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
|
||||
windowSize := conf.minBreachCount
|
||||
if windowSize <= 0 {
|
||||
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
||||
dataLen := len(realTimeValues)
|
||||
if dataLen < windowSize || windowSize <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// mark whether any events have been triggered in this batch
|
||||
var eventTriggered bool
|
||||
breachTriggers := map[string]bool{
|
||||
"up": false, "upup": false, "down": false, "downdown": false,
|
||||
statusArray := make([]string, dataLen)
|
||||
for i, val := range realTimeValues {
|
||||
statusArray[i] = getTEBreachType(val, thresholds)
|
||||
}
|
||||
|
||||
// implement slide window to determine breach counts
|
||||
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
|
||||
window := realTimeValues[i : i+windowSize]
|
||||
firstValueBreachType := getTEBreachType(window[0], thresholds)
|
||||
breachTriggers := make(map[string]teBreachTrigger)
|
||||
for i := 0; i <= dataLen-windowSize; i++ {
|
||||
firstBreachType := statusArray[i]
|
||||
|
||||
if firstValueBreachType == "" {
|
||||
// if the first value in the window does not breach, skip this window directly
|
||||
if firstBreachType == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
allMatch := true
|
||||
for j := 1; j < windowSize; j++ {
|
||||
currentValueBreachType := getTEBreachType(window[j], thresholds)
|
||||
if currentValueBreachType != firstValueBreachType {
|
||||
if statusArray[i+j] != firstBreachType {
|
||||
allMatch = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if allMatch {
|
||||
triggerValues := realTimeValues[i : i+windowSize]
|
||||
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
|
||||
if !breachTriggers[firstValueBreachType] {
|
||||
// trigger event
|
||||
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
|
||||
_, exists := breachTriggers[firstBreachType]
|
||||
if !exists {
|
||||
logger.Warn(ctx, "event triggered by sliding window",
|
||||
"breach_type", firstBreachType,
|
||||
"trigger_values", triggerValues)
|
||||
|
||||
breachTriggers[firstValueBreachType] = true
|
||||
eventTriggered = true
|
||||
// build Options
|
||||
opts := []event.EventOption{
|
||||
event.WithConditionValue(triggerValues, conf.Cause),
|
||||
event.WithTEAnalysisResult(firstBreachType),
|
||||
event.WithCategory(constants.EventUpDownRoutingKey),
|
||||
// TODO 生成 operations并考虑如何放入 event 中
|
||||
// event.WithOperations(nil)
|
||||
}
|
||||
breachTriggers[firstBreachType] = teBreachTrigger{
|
||||
breachType: firstBreachType,
|
||||
triggered: false,
|
||||
triggeredValues: triggerValues,
|
||||
eventOpts: opts,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if eventTriggered {
|
||||
command, content := genTEEventCommandAndContent(ctx, conf.Action)
|
||||
// TODO 考虑 content 是否可以为空,先期不允许
|
||||
if command == "" || content == "" {
|
||||
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
||||
return
|
||||
}
|
||||
event.TriggerEventAction(ctx, command, content)
|
||||
return
|
||||
for breachType, trigger := range breachTriggers {
|
||||
// trigger Action
|
||||
command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action)
|
||||
eventName := fmt.Sprintf("telemetry_%s_%s_Breach_Event", mainBody, breachType)
|
||||
event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func genTEEventCommandAndContent(ctx context.Context, action map[string]any) (command string, content string) {
|
||||
func genTEEventCommandAndMainBody(ctx context.Context, action map[string]any) (command string, mainBody string) {
|
||||
cmdValue, exist := action["command"]
|
||||
if !exist {
|
||||
logger.Error(ctx, "can not find command variable into action map", "action", action)
|
||||
|
|
@ -185,7 +202,7 @@ type tiEventThresholds struct {
|
|||
isFloatCause bool
|
||||
}
|
||||
|
||||
// parseTEThresholds define func to parse telesignal thresholds by casue map
|
||||
// parseTIThresholds define func to parse telesignal thresholds by casue map
|
||||
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
||||
edgeKey := "edge"
|
||||
t := tiEventThresholds{
|
||||
|
|
@ -211,11 +228,12 @@ func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
|||
|
||||
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
|
||||
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
|
||||
if t.edge == constants.TelesignalRaising {
|
||||
switch t.edge {
|
||||
case constants.TelesignalRaising:
|
||||
if previousValue == 0.0 && currentValue == 1.0 {
|
||||
return constants.TIBreachTriggerType
|
||||
}
|
||||
} else if t.edge == constants.TelesignalFalling {
|
||||
case constants.TelesignalFalling:
|
||||
if previousValue == 1.0 && currentValue == 0.0 {
|
||||
return constants.TIBreachTriggerType
|
||||
}
|
||||
|
|
@ -297,18 +315,17 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE
|
|||
}
|
||||
|
||||
if eventTriggered {
|
||||
command, content := genTIEventCommandAndContent(conf.Action)
|
||||
// TODO 考虑 content 是否可以为空,先期不允许
|
||||
if command == "" || content == "" {
|
||||
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
||||
command, mainBody := genTIEventCommandAndMainBody(conf.Action)
|
||||
if command == "" || mainBody == "" {
|
||||
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "main_body", mainBody)
|
||||
return
|
||||
}
|
||||
event.TriggerEventAction(ctx, command, content)
|
||||
event.TriggerEventAction(ctx, command, mainBody)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func genTIEventCommandAndContent(action map[string]any) (command string, content string) {
|
||||
func genTIEventCommandAndMainBody(action map[string]any) (command string, mainBody string) {
|
||||
cmdValue, exist := action["command"]
|
||||
if !exist {
|
||||
return "", ""
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
// Package event define real time data evnet operation functions
|
||||
package event
|
||||
|
||||
// EventRecord define struct for CIM event record
|
||||
type EventRecord struct {
|
||||
// 事件名称
|
||||
EventName string `json:"event"`
|
||||
// 事件唯一标识符
|
||||
EventUUID string `json:"event_uuid"`
|
||||
// 事件类型
|
||||
Type int `json:"type"`
|
||||
// 事件优先级 (0-9)
|
||||
Priority int `json:"priority"`
|
||||
// 事件状态
|
||||
Status int `json:"status"`
|
||||
// 可选模板参数
|
||||
Category string `json:"category,omitempty"`
|
||||
// 毫秒级时间戳 (Unix epoch)
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
// 事件来源 (station, platform, msa)
|
||||
From string `json:"from"`
|
||||
// 事件场景描述对象 (如阈值、当前值)
|
||||
Condition map[string]any `json:"condition"`
|
||||
// 与事件相关的订阅信息
|
||||
AttachedSubscriptions []any `json:"attached_subscriptions"`
|
||||
// 事件分析结果对象
|
||||
Result map[string]any `json:"result,omitempty"`
|
||||
// 操作历史记录 (CIM ActivityRecord)
|
||||
Operations []OperationRecord `json:"operations"`
|
||||
// 子站告警原始数据 (CIM Alarm 数据)
|
||||
Origin map[string]any `json:"origin,omitempty"`
|
||||
}
|
||||
|
||||
// OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等
|
||||
type OperationRecord struct {
|
||||
Action string `json:"action"` // 执行的动作,如 "acknowledgment"
|
||||
Op string `json:"op"` // 操作人/操作账号标识
|
||||
TS int64 `json:"ts"` // 操作发生的毫秒时间戳
|
||||
}
|
||||
|
|
@ -3,11 +3,13 @@ package event
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"modelRT/logger"
|
||||
"modelRT/mq"
|
||||
)
|
||||
|
||||
type actionHandler func(ctx context.Context, content string) error
|
||||
type actionHandler func(ctx context.Context, content string, ops ...EventOption) error
|
||||
|
||||
// actionDispatchMap define variable to store all action handler into map
|
||||
var actionDispatchMap = map[string]actionHandler{
|
||||
|
|
@ -19,45 +21,68 @@ var actionDispatchMap = map[string]actionHandler{
|
|||
}
|
||||
|
||||
// TriggerEventAction define func to trigger event by action in compute config
|
||||
func TriggerEventAction(ctx context.Context, command string, content string) {
|
||||
func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) {
|
||||
handler, exists := actionDispatchMap[command]
|
||||
if !exists {
|
||||
logger.Error(ctx, "unknown action command", "command", command)
|
||||
return
|
||||
}
|
||||
err := handler(ctx, content)
|
||||
err := handler(ctx, eventName, ops...)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err)
|
||||
logger.Error(ctx, "action handler failed", "command", command, "event_name", eventName, "error", err)
|
||||
return
|
||||
}
|
||||
logger.Info(ctx, "action handler success", "command", command, "content", content)
|
||||
logger.Info(ctx, "action handler success", "command", command, "event_name", eventName)
|
||||
}
|
||||
|
||||
func handleInfoAction(ctx context.Context, content string) error {
|
||||
// 实际执行发送警告、记录日志等操作
|
||||
actionParams := content
|
||||
// ... logic to send info level event using actionParams ...
|
||||
logger.Warn(ctx, "trigger info event", "message", actionParams)
|
||||
func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) error {
|
||||
eventRecord, err := NewGeneralPlatformSoftRecord(eventName, ops...)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "generate info event record failed", "error", err)
|
||||
return err
|
||||
}
|
||||
recordBytes, err := json.Marshal(eventRecord)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
||||
return err
|
||||
}
|
||||
mq.MsgChan <- recordBytes
|
||||
logger.Info(ctx, "trigger info event", "event_name", eventName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleWarningAction(ctx context.Context, content string) error {
|
||||
// 实际执行发送警告、记录日志等操作
|
||||
actionParams := content
|
||||
// ... logic to send warning level event using actionParams ...
|
||||
logger.Warn(ctx, "trigger warning event", "message", actionParams)
|
||||
func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) error {
|
||||
eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "generate warning event record failed", "error", err)
|
||||
return err
|
||||
}
|
||||
recordBytes, err := json.Marshal(eventRecord)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
||||
return err
|
||||
}
|
||||
mq.MsgChan <- recordBytes
|
||||
logger.Info(ctx, "trigger warning event", "event_name", eventName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleErrorAction(ctx context.Context, content string) error {
|
||||
// 实际执行发送警告、记录日志等操作
|
||||
actionParams := content
|
||||
// ... logic to send error level event using actionParams ...
|
||||
logger.Warn(ctx, "trigger error event", "message", actionParams)
|
||||
func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) error {
|
||||
eventRecord, err := NewCriticalPlatformSoftRecord(eventName, ops...)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "generate error event record failed", "error", err)
|
||||
return err
|
||||
}
|
||||
recordBytes, err := json.Marshal(eventRecord)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
||||
return err
|
||||
}
|
||||
mq.MsgChan <- recordBytes
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleCriticalAction(ctx context.Context, content string) error {
|
||||
func handleCriticalAction(ctx context.Context, content string, ops ...EventOption) error {
|
||||
// 实际执行发送警告、记录日志等操作
|
||||
actionParams := content
|
||||
// ... logic to send critical level event using actionParams ...
|
||||
|
|
@ -65,7 +90,7 @@ func handleCriticalAction(ctx context.Context, content string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func handleExceptionAction(ctx context.Context, content string) error {
|
||||
func handleExceptionAction(ctx context.Context, content string, ops ...EventOption) error {
|
||||
// 实际执行发送警告、记录日志等操作
|
||||
actionParams := content
|
||||
// ... logic to send except level event using actionParams ...
|
||||
|
|
|
|||
|
|
@ -0,0 +1,85 @@
|
|||
// Package event define real time data evnet operation functions
|
||||
package event
|
||||
|
||||
import (
|
||||
"maps"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// EventOption define option function type for event record creation
|
||||
type EventOption func(*EventRecord)
|
||||
|
||||
// WithCondition define option function to set event condition description
|
||||
func WithCondition(cond map[string]any) EventOption {
|
||||
return func(e *EventRecord) {
|
||||
if cond != nil {
|
||||
e.Condition = cond
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithSubscriptions define option function to set event attached subscription information
|
||||
func WithSubscriptions(subs []any) EventOption {
|
||||
return func(e *EventRecord) {
|
||||
if subs != nil {
|
||||
e.AttachedSubscriptions = subs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithOperations define option function to set event operation records
|
||||
func WithOperations(ops []OperationRecord) EventOption {
|
||||
return func(e *EventRecord) {
|
||||
if ops != nil {
|
||||
e.Operations = ops
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithCategory define option function to set event category
|
||||
func WithCategory(cat string) EventOption {
|
||||
return func(e *EventRecord) {
|
||||
e.Category = cat
|
||||
}
|
||||
}
|
||||
|
||||
// WithResult define option function to set event analysis result
|
||||
func WithResult(result map[string]any) EventOption {
|
||||
return func(e *EventRecord) {
|
||||
e.Result = result
|
||||
}
|
||||
}
|
||||
|
||||
func WithTEAnalysisResult(breachType string) EventOption {
|
||||
return func(e *EventRecord) {
|
||||
if e.Result == nil {
|
||||
e.Result = make(map[string]any)
|
||||
}
|
||||
|
||||
description := "数据异常"
|
||||
switch strings.ToLower(breachType) {
|
||||
case "upup":
|
||||
description = "超越上上限"
|
||||
case "up":
|
||||
description = "超越上限"
|
||||
case "down":
|
||||
description = "超越下限"
|
||||
case "downdown":
|
||||
description = "超越下下限"
|
||||
}
|
||||
|
||||
e.Result["analysis_desc"] = description
|
||||
e.Result["breach_type"] = breachType
|
||||
}
|
||||
}
|
||||
|
||||
// WithConditionValue define option function to set event condition with real time value and extra data
|
||||
func WithConditionValue(realTimeValue []float64, extraData map[string]any) EventOption {
|
||||
return func(e *EventRecord) {
|
||||
if e.Condition == nil {
|
||||
e.Condition = make(map[string]any)
|
||||
}
|
||||
e.Condition["real_time_value"] = realTimeValue
|
||||
maps.Copy(e.Condition, extraData)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
// Package event define real time data evnet operation functions
|
||||
package event
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"modelRT/constants"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
)
|
||||
|
||||
// NewPlatformEventRecord define func to create a new platform event record with common fields initialized
|
||||
func NewPlatformEventRecord(eventType int, priority int, eventName string, opts ...EventOption) (*EventRecord, error) {
|
||||
u, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate UUID: %w", err)
|
||||
}
|
||||
|
||||
record := &EventRecord{
|
||||
EventName: eventName,
|
||||
EventUUID: u.String(),
|
||||
Type: eventType,
|
||||
Priority: priority,
|
||||
Status: 1,
|
||||
From: constants.EventFromPlatform,
|
||||
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||
Condition: make(map[string]any),
|
||||
AttachedSubscriptions: make([]any, 0),
|
||||
Operations: make([]OperationRecord, 0),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(record)
|
||||
}
|
||||
|
||||
return record, nil
|
||||
}
|
||||
|
||||
// NewGeneralPlatformSoftRecord define func to create a new general platform software event record
|
||||
func NewGeneralPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
||||
return NewPlatformEventRecord(int(constants.EventGeneralPlatformSoft), 0, name, opts...)
|
||||
}
|
||||
|
||||
// NewGeneralApplicationSoftRecord define func to create a new general application software event record
|
||||
func NewGeneralApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
||||
return NewPlatformEventRecord(int(constants.EventGeneralApplicationSoft), 0, name, opts...)
|
||||
}
|
||||
|
||||
// NewWarnPlatformSoftRecord define func to create a new warning platform software event record
|
||||
func NewWarnPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
||||
return NewPlatformEventRecord(int(constants.EventWarnPlatformSoft), 3, name, opts...)
|
||||
}
|
||||
|
||||
// NewWarnApplicationSoftRecord define func to create a new warning application software event record
|
||||
func NewWarnApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
||||
return NewPlatformEventRecord(int(constants.EventWarnApplicationSoft), 3, name, opts...)
|
||||
}
|
||||
|
||||
// NewCriticalPlatformSoftRecord define func to create a new critical platform software event record
|
||||
func NewCriticalPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
||||
return NewPlatformEventRecord(int(constants.EventCriticalPlatformSoft), 6, name, opts...)
|
||||
}
|
||||
|
||||
// NewCriticalApplicationSoftRecord define func to create a new critical application software event record
|
||||
func NewCriticalApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
||||
return NewPlatformEventRecord(int(constants.EventCriticalApplicationSoft), 6, name, opts...)
|
||||
}
|
||||
|
|
@ -1,400 +0,0 @@
|
|||
// Package realtimedata define real time data operation functions
|
||||
package realtimedata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"modelRT/constants"
|
||||
"modelRT/diagram"
|
||||
"modelRT/logger"
|
||||
"modelRT/model"
|
||||
"modelRT/network"
|
||||
"modelRT/orm"
|
||||
"modelRT/util"
|
||||
)
|
||||
|
||||
var (
|
||||
// RealTimeDataChan define channel of real time data receive
|
||||
RealTimeDataChan chan network.RealTimeDataReceiveRequest
|
||||
globalComputeState *MeasComputeState
|
||||
)
|
||||
|
||||
func init() {
|
||||
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
|
||||
globalComputeState = NewMeasComputeState()
|
||||
}
|
||||
|
||||
// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info
|
||||
func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) {
|
||||
for _, measurement := range measurements {
|
||||
enableValue, exist := measurement.EventPlan["enable"]
|
||||
enable, ok := enableValue.(bool)
|
||||
if !exist || !enable {
|
||||
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
|
||||
continue
|
||||
}
|
||||
|
||||
if !ok {
|
||||
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
|
||||
continue
|
||||
}
|
||||
|
||||
conf, err := initComputeConfig(measurement)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if conf == nil {
|
||||
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
|
||||
continue
|
||||
}
|
||||
|
||||
uuidStr := measurement.ComponentUUID.String()
|
||||
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
|
||||
conf.StopGchan = make(chan struct{})
|
||||
globalComputeState.Store(uuidStr, conf)
|
||||
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID)
|
||||
go continuousComputation(enrichedCtx, conf)
|
||||
}
|
||||
}
|
||||
|
||||
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
||||
var err error
|
||||
|
||||
enableValue, exist := measurement.EventPlan["enable"]
|
||||
enable, ok := enableValue.(bool)
|
||||
if !exist {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
|
||||
}
|
||||
|
||||
if !enable {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
conf := &ComputeConfig{}
|
||||
|
||||
causeValue, exist := measurement.EventPlan["cause"]
|
||||
if !exist {
|
||||
return nil, errors.New("missing required field cause")
|
||||
}
|
||||
|
||||
cause, ok := causeValue.(map[string]any)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
|
||||
}
|
||||
conf.Cause, err = processCauseMap(cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
|
||||
}
|
||||
|
||||
actionValue, exist := measurement.EventPlan["action"]
|
||||
if !exist {
|
||||
return nil, errors.New("missing required field action")
|
||||
}
|
||||
action, ok := actionValue.(map[string]any)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
|
||||
}
|
||||
conf.Action = action
|
||||
|
||||
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
|
||||
}
|
||||
conf.QueryKey = queryKey
|
||||
conf.DataSize = int64(measurement.Size)
|
||||
// TODO use constant values for temporary settings
|
||||
conf.minBreachCount = constants.MinBreachCount
|
||||
// TODO 后续优化 duration 创建方式
|
||||
conf.Duration = 10
|
||||
|
||||
isFloatCause := false
|
||||
if _, exists := conf.Cause["up"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["down"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["upup"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["downdown"]; exists {
|
||||
isFloatCause = true
|
||||
}
|
||||
|
||||
if isFloatCause {
|
||||
// te config
|
||||
teThresholds, err := parseTEThresholds(conf.Cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
|
||||
}
|
||||
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
|
||||
} else {
|
||||
// ti config
|
||||
tiThresholds, err := parseTIThresholds(conf.Cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
|
||||
}
|
||||
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
func processCauseMap(data map[string]any) (map[string]any, error) {
|
||||
causeResult := make(map[string]any)
|
||||
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
||||
|
||||
var foundFloatKey bool
|
||||
for _, key := range keysToExtract {
|
||||
if value, exists := data[key]; exists {
|
||||
|
||||
foundFloatKey = true
|
||||
|
||||
// check value type
|
||||
if floatVal, ok := value.(float64); ok {
|
||||
causeResult[key] = floatVal
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if foundFloatKey == true {
|
||||
return causeResult, nil
|
||||
}
|
||||
|
||||
edgeKey := "edge"
|
||||
if value, exists := data[edgeKey]; exists {
|
||||
if stringVal, ok := value.(string); ok {
|
||||
switch stringVal {
|
||||
case "raising":
|
||||
fallthrough
|
||||
case "falling":
|
||||
causeResult[edgeKey] = stringVal
|
||||
default:
|
||||
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
|
||||
}
|
||||
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
|
||||
}
|
||||
|
||||
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
||||
client := diagram.NewRedisClient()
|
||||
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
|
||||
duration := util.SecondsToDuration(conf.Duration)
|
||||
ticker := time.NewTicker(duration)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-conf.StopGchan:
|
||||
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
|
||||
return
|
||||
case <-ticker.C:
|
||||
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
realTimedatas := util.ConvertZSetMembersToFloat64(members)
|
||||
if conf.Analyzer != nil {
|
||||
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
||||
} else {
|
||||
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// // ReceiveChan define func to real time data receive and process
|
||||
// func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) {
|
||||
// consumer, err := kafka.NewConsumer(consumerConfig)
|
||||
// if err != nil {
|
||||
// logger.Error(ctx, "create kafka consumer failed", "error", err)
|
||||
// return
|
||||
// }
|
||||
// defer consumer.Close()
|
||||
|
||||
// err = consumer.SubscribeTopics(topics, nil)
|
||||
// if err != nil {
|
||||
// logger.Error(ctx, "subscribe kafka topics failed", "topic", topics, "error", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
// batchSize := 100
|
||||
// batchTimeout := util.SecondsToDuration(duration)
|
||||
// messages := make([]*kafka.Message, 0, batchSize)
|
||||
// lastCommit := time.Now()
|
||||
// logger.Info(ctx, "start consuming from kafka", "topic", topics)
|
||||
// for {
|
||||
// select {
|
||||
// case <-ctx.Done():
|
||||
// logger.Info(ctx, "stop real time data computing by context cancel")
|
||||
// return
|
||||
// case realTimeData := <-RealTimeDataChan:
|
||||
// componentUUID := realTimeData.PayLoad.ComponentUUID
|
||||
// component, err := diagram.GetComponentMap(componentUUID)
|
||||
// if err != nil {
|
||||
// logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err)
|
||||
// continue
|
||||
// }
|
||||
|
||||
// componentType := component.Type
|
||||
// if componentType != constants.DemoType {
|
||||
// logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID)
|
||||
// continue
|
||||
// }
|
||||
|
||||
// var anchorName string
|
||||
// var compareValUpperLimit, compareValLowerLimit float64
|
||||
// var anchorRealTimeData []float64
|
||||
// var calculateFunc func(archorValue float64, args ...float64) float64
|
||||
|
||||
// // calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData)
|
||||
|
||||
// for _, param := range realTimeData.PayLoad.Values {
|
||||
// anchorRealTimeData = append(anchorRealTimeData, param.Value)
|
||||
// }
|
||||
|
||||
// anchorConfig := config.AnchorParamConfig{
|
||||
// AnchorParamBaseConfig: config.AnchorParamBaseConfig{
|
||||
// ComponentUUID: componentUUID,
|
||||
// AnchorName: anchorName,
|
||||
// CompareValUpperLimit: compareValUpperLimit,
|
||||
// CompareValLowerLimit: compareValLowerLimit,
|
||||
// AnchorRealTimeData: anchorRealTimeData,
|
||||
// },
|
||||
// CalculateFunc: calculateFunc,
|
||||
// CalculateParams: []float64{},
|
||||
// }
|
||||
// anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
|
||||
// if err != nil {
|
||||
// logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err)
|
||||
// continue
|
||||
// }
|
||||
// anchorChan <- anchorConfig
|
||||
// default:
|
||||
// msg, err := consumer.ReadMessage(batchTimeout)
|
||||
// if err != nil {
|
||||
// if err.(kafka.Error).Code() == kafka.ErrTimedOut {
|
||||
// // process accumulated messages when timeout
|
||||
// if len(messages) > 0 {
|
||||
// processMessageBatch(ctx, messages)
|
||||
// consumer.Commit()
|
||||
// messages = messages[:0]
|
||||
// }
|
||||
// continue
|
||||
// }
|
||||
// logger.Error(ctx, "read message from kafka failed", "error", err, "msg", msg)
|
||||
// continue
|
||||
// }
|
||||
|
||||
// messages = append(messages, msg)
|
||||
// // process messages when batch size or timeout period is reached
|
||||
// if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout {
|
||||
// processMessageBatch(ctx, messages)
|
||||
// consumer.Commit()
|
||||
// messages = messages[:0]
|
||||
// lastCommit = time.Now()
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// type realTimeDataPayload struct {
|
||||
// ComponentUUID string
|
||||
// Values []float64
|
||||
// }
|
||||
|
||||
// type realTimeData struct {
|
||||
// Payload realTimeDataPayload
|
||||
// }
|
||||
|
||||
// func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
|
||||
// var realTimeData realTimeData
|
||||
// err := json.Unmarshal(msgValue, &realTimeData)
|
||||
// if err != nil {
|
||||
// return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
||||
// }
|
||||
// return &realTimeData, nil
|
||||
// }
|
||||
|
||||
// func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
|
||||
// componentUUID := realTimeData.Payload.ComponentUUID
|
||||
// component, err := diagram.GetComponentMap(componentUUID)
|
||||
// if err != nil {
|
||||
// logger.Error(ctx, "query component info from diagram map by component id failed",
|
||||
// "component_uuid", componentUUID, "error", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
// componentType := component.Type
|
||||
// if componentType != constants.DemoType {
|
||||
// logger.Error(ctx, "can not process real time data of component type not equal DemoType",
|
||||
// "component_uuid", componentUUID)
|
||||
// return
|
||||
// }
|
||||
|
||||
// var anchorName string
|
||||
// var compareValUpperLimit, compareValLowerLimit float64
|
||||
// var anchorRealTimeData []float64
|
||||
// var calculateFunc func(archorValue float64, args ...float64) float64
|
||||
|
||||
// for _, param := range realTimeData.Payload.Values {
|
||||
// anchorRealTimeData = append(anchorRealTimeData, param)
|
||||
// }
|
||||
|
||||
// anchorConfig := config.AnchorParamConfig{
|
||||
// AnchorParamBaseConfig: config.AnchorParamBaseConfig{
|
||||
// ComponentUUID: componentUUID,
|
||||
// AnchorName: anchorName,
|
||||
// CompareValUpperLimit: compareValUpperLimit,
|
||||
// CompareValLowerLimit: compareValLowerLimit,
|
||||
// AnchorRealTimeData: anchorRealTimeData,
|
||||
// },
|
||||
// CalculateFunc: calculateFunc,
|
||||
// CalculateParams: []float64{},
|
||||
// }
|
||||
|
||||
// anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
|
||||
// if err != nil {
|
||||
// logger.Error(ctx, "get anchor param chan failed",
|
||||
// "component_uuid", componentUUID, "error", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
// select {
|
||||
// case anchorChan <- anchorConfig:
|
||||
// case <-ctx.Done():
|
||||
// logger.Info(ctx, "context done while sending to anchor chan")
|
||||
// case <-time.After(5 * time.Second):
|
||||
// logger.Error(ctx, "timeout sending to anchor chan", "component_uuid", componentUUID)
|
||||
// }
|
||||
// }
|
||||
|
||||
// // processMessageBatch define func to bathc process kafka message
|
||||
// func processMessageBatch(ctx context.Context, messages []*kafka.Message) {
|
||||
// for _, msg := range messages {
|
||||
// realTimeData, err := parseKafkaMessage(msg.Value)
|
||||
// if err != nil {
|
||||
// logger.Error(ctx, "parse kafka message failed", "error", err, "msg", msg)
|
||||
// continue
|
||||
// }
|
||||
// go processRealTimeData(ctx, realTimeData)
|
||||
// }
|
||||
// }
|
||||
|
|
@ -0,0 +1,229 @@
|
|||
// Package realtimedata define real time data operation functions
|
||||
package realtimedata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"modelRT/constants"
|
||||
"modelRT/diagram"
|
||||
"modelRT/logger"
|
||||
"modelRT/model"
|
||||
"modelRT/network"
|
||||
"modelRT/orm"
|
||||
"modelRT/util"
|
||||
)
|
||||
|
||||
var (
|
||||
// RealTimeDataChan define channel of real time data receive
|
||||
RealTimeDataChan chan network.RealTimeDataReceiveRequest
|
||||
globalComputeState *MeasComputeState
|
||||
)
|
||||
|
||||
func init() {
|
||||
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
|
||||
globalComputeState = NewMeasComputeState()
|
||||
}
|
||||
|
||||
// StartComputingRealTimeDataLimit define func to start compute real time data up or down limit process goroutines by measurement info
|
||||
func StartComputingRealTimeDataLimit(ctx context.Context, measurements []orm.Measurement) {
|
||||
for _, measurement := range measurements {
|
||||
enableValue, exist := measurement.EventPlan["enable"]
|
||||
enable, ok := enableValue.(bool)
|
||||
if !exist || !enable {
|
||||
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
|
||||
continue
|
||||
}
|
||||
|
||||
if !ok {
|
||||
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
|
||||
continue
|
||||
}
|
||||
|
||||
conf, err := initComputeConfig(measurement)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if conf == nil {
|
||||
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
|
||||
continue
|
||||
}
|
||||
|
||||
uuidStr := measurement.ComponentUUID.String()
|
||||
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
|
||||
conf.StopGchan = make(chan struct{})
|
||||
globalComputeState.Store(uuidStr, conf)
|
||||
logger.Info(ctx, "starting computing real time data limit for measurement", "measurement_uuid", measurement.ComponentUUID)
|
||||
go continuousComputation(enrichedCtx, conf)
|
||||
}
|
||||
}
|
||||
|
||||
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
||||
var err error
|
||||
|
||||
enableValue, exist := measurement.EventPlan["enable"]
|
||||
enable, ok := enableValue.(bool)
|
||||
if !exist {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
|
||||
}
|
||||
|
||||
if !enable {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
conf := &ComputeConfig{}
|
||||
|
||||
causeValue, exist := measurement.EventPlan["cause"]
|
||||
if !exist {
|
||||
return nil, errors.New("missing required field cause")
|
||||
}
|
||||
|
||||
cause, ok := causeValue.(map[string]any)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
|
||||
}
|
||||
conf.Cause, err = processCauseMap(cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
|
||||
}
|
||||
|
||||
actionValue, exist := measurement.EventPlan["action"]
|
||||
if !exist {
|
||||
return nil, errors.New("missing required field action")
|
||||
}
|
||||
action, ok := actionValue.(map[string]any)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
|
||||
}
|
||||
conf.Action = action
|
||||
|
||||
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
|
||||
}
|
||||
conf.QueryKey = queryKey
|
||||
conf.DataSize = int64(measurement.Size)
|
||||
// TODO use constant values for temporary settings
|
||||
conf.minBreachCount = constants.MinBreachCount
|
||||
// TODO 后续优化 duration 创建方式
|
||||
conf.Duration = 10
|
||||
|
||||
isFloatCause := false
|
||||
if _, exists := conf.Cause["up"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["down"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["upup"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["downdown"]; exists {
|
||||
isFloatCause = true
|
||||
}
|
||||
|
||||
if isFloatCause {
|
||||
// te config
|
||||
teThresholds, err := parseTEThresholds(conf.Cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
|
||||
}
|
||||
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
|
||||
} else {
|
||||
// ti config
|
||||
tiThresholds, err := parseTIThresholds(conf.Cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
|
||||
}
|
||||
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
func processCauseMap(data map[string]any) (map[string]any, error) {
|
||||
causeResult := make(map[string]any)
|
||||
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
||||
|
||||
var foundFloatKey bool
|
||||
for _, key := range keysToExtract {
|
||||
if value, exists := data[key]; exists {
|
||||
|
||||
foundFloatKey = true
|
||||
|
||||
// check value type
|
||||
if floatVal, ok := value.(float64); ok {
|
||||
causeResult[key] = floatVal
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if foundFloatKey {
|
||||
return causeResult, nil
|
||||
}
|
||||
|
||||
edgeKey := "edge"
|
||||
if value, exists := data[edgeKey]; exists {
|
||||
if stringVal, ok := value.(string); ok {
|
||||
switch stringVal {
|
||||
case "raising":
|
||||
fallthrough
|
||||
case "falling":
|
||||
causeResult[edgeKey] = stringVal
|
||||
default:
|
||||
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
|
||||
}
|
||||
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
|
||||
}
|
||||
|
||||
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
||||
client := diagram.NewRedisClient()
|
||||
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
|
||||
duration := util.SecondsToDuration(conf.Duration)
|
||||
ticker := time.NewTicker(duration)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-conf.StopGchan:
|
||||
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
|
||||
return
|
||||
case <-ticker.C:
|
||||
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize)
|
||||
cancel()
|
||||
if err != nil {
|
||||
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
realTimedatas := util.ConvertZSetMembersToFloat64(members)
|
||||
if len(realTimedatas) == 0 {
|
||||
logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
|
||||
continue
|
||||
}
|
||||
|
||||
if conf.Analyzer != nil {
|
||||
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
||||
} else {
|
||||
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,97 +0,0 @@
|
|||
package sharememory
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"modelRT/orm"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// CreateShareMemory defines a function to create a shared memory
|
||||
func CreateShareMemory(key uintptr, structSize uintptr) (uintptr, error) {
|
||||
// logger := logger.GetLoggerInstance()
|
||||
// create shared memory
|
||||
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, structSize, unix.IPC_CREAT|0o666)
|
||||
if err != 0 {
|
||||
// logger.Error(fmt.Sprintf("create shared memory by key %v failed:", key), zap.Error(err))
|
||||
return 0, fmt.Errorf("create shared memory failed:%w", err)
|
||||
}
|
||||
|
||||
// attach shared memory
|
||||
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
|
||||
if err != 0 {
|
||||
// logger.Error(fmt.Sprintf("attach shared memory by shmID %v failed:", shmID), zap.Error(err))
|
||||
return 0, fmt.Errorf("attach shared memory failed:%w", err)
|
||||
}
|
||||
return shmAddr, nil
|
||||
}
|
||||
|
||||
// ReadComponentFromShareMemory defines a function to read component value from shared memory
|
||||
func ReadComponentFromShareMemory(key uintptr, componentInfo *orm.Component) error {
|
||||
structSize := unsafe.Sizeof(orm.Component{})
|
||||
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, uintptr(int(structSize)), 0o666)
|
||||
if err != 0 {
|
||||
return fmt.Errorf("get shared memory failed:%w", err)
|
||||
}
|
||||
|
||||
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
|
||||
if err != 0 {
|
||||
return fmt.Errorf("attach shared memory failed:%w", err)
|
||||
}
|
||||
|
||||
// 读取共享内存中的数据
|
||||
componentInfo = (*orm.Component)(unsafe.Pointer(shmAddr + structSize))
|
||||
|
||||
// Detach shared memory
|
||||
unix.Syscall(unix.SYS_SHMDT, shmAddr, 0, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
func WriteComponentInShareMemory(key uintptr, componentInfo *orm.Component) error {
|
||||
structSize := unsafe.Sizeof(orm.Component{})
|
||||
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, uintptr(int(structSize)), 0o666)
|
||||
if err != 0 {
|
||||
return fmt.Errorf("get shared memory failed:%w", err)
|
||||
}
|
||||
|
||||
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
|
||||
if err != 0 {
|
||||
return fmt.Errorf("attach shared memory failed:%w", err)
|
||||
}
|
||||
|
||||
obj := (*orm.Component)(unsafe.Pointer(shmAddr + unsafe.Sizeof(structSize)))
|
||||
fmt.Println(obj)
|
||||
|
||||
// id integer NOT NULL DEFAULT nextval('component_id_seq'::regclass),
|
||||
// global_uuid uuid NOT NULL DEFAULT gen_random_uuid(),
|
||||
// nspath character varying(32) COLLATE pg_catalog."default",
|
||||
// tag character varying(32) COLLATE pg_catalog."default" NOT NULL,
|
||||
// name character varying(64) COLLATE pg_catalog."default" NOT NULL,
|
||||
// description character varying(512) COLLATE pg_catalog."default" NOT NULL DEFAULT ''::character varying,
|
||||
// grid character varying(64) COLLATE pg_catalog."default" NOT NULL,
|
||||
// zone character varying(64) COLLATE pg_catalog."default" NOT NULL,
|
||||
// station character varying(64) COLLATE pg_catalog."default" NOT NULL,
|
||||
// type integer NOT NULL,
|
||||
// in_service boolean DEFAULT false,
|
||||
// state integer NOT NULL DEFAULT 0,
|
||||
// connected_bus jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
// label jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
// context jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
// page_id integer NOT NULL,
|
||||
// op integer NOT NULL DEFAULT '-1'::integer,
|
||||
// ts timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
unix.Syscall(unix.SYS_SHMDT, shmAddr, 0, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteShareMemory defines a function to delete shared memory
|
||||
func DeleteShareMemory(key uintptr) error {
|
||||
_, _, err := unix.Syscall(unix.SYS_SHM_UNLINK, key, 0, 0o666)
|
||||
if err != 0 {
|
||||
return fmt.Errorf("get shared memory failed:%w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -3,6 +3,7 @@ package util
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
|
@ -13,25 +14,36 @@ import (
|
|||
)
|
||||
|
||||
// NewRedisClient define func of initialize the Redis client
|
||||
func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) {
|
||||
func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
|
||||
// default options
|
||||
options := RedisOptions{
|
||||
redisOptions: &redis.Options{
|
||||
configs := &clientConfig{
|
||||
Options: &redis.Options{
|
||||
Addr: addr,
|
||||
DialTimeout: 5 * time.Second,
|
||||
ReadTimeout: 3 * time.Second,
|
||||
WriteTimeout: 3 * time.Second,
|
||||
PoolSize: 10,
|
||||
},
|
||||
}
|
||||
|
||||
// Apply configuration options from config
|
||||
var errs []error
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
if err := opt(configs); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, fmt.Errorf("failed to apply options: %w", errors.Join(errs...))
|
||||
}
|
||||
|
||||
// create redis client
|
||||
client := redis.NewClient(options.redisOptions)
|
||||
client := redis.NewClient(configs.Options)
|
||||
|
||||
if options.timeout > 0 {
|
||||
if configs.DialTimeout > 0 {
|
||||
// check if the connection is successful
|
||||
ctx, cancel := context.WithTimeout(context.Background(), options.timeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), configs.DialTimeout)
|
||||
defer cancel()
|
||||
if err := client.Ping(ctx).Err(); err != nil {
|
||||
return nil, fmt.Errorf("can not connect redis:%v", err)
|
||||
|
|
@ -43,22 +55,29 @@ func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) {
|
|||
// NewRedigoPool define func of initialize the Redigo pool
|
||||
func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
|
||||
pool := &redigo.Pool{
|
||||
// TODO optimize IdleTimeout with config parameter
|
||||
MaxIdle: rCfg.PoolSize / 2,
|
||||
MaxActive: rCfg.PoolSize,
|
||||
// TODO optimize IdleTimeout with config parameter
|
||||
IdleTimeout: 240 * time.Second,
|
||||
TestOnBorrow: func(c redigo.Conn, t time.Time) error {
|
||||
if time.Since(t) < time.Minute {
|
||||
return nil
|
||||
}
|
||||
_, err := c.Do("PING")
|
||||
return err
|
||||
},
|
||||
|
||||
// Dial function to create the connection
|
||||
Dial: func() (redigo.Conn, error) {
|
||||
timeout := time.Duration(rCfg.Timeout) * time.Millisecond // 假设 rCfg.Timeout 是毫秒
|
||||
dialTimeout := time.Duration(rCfg.DialTimeout) * time.Second
|
||||
readTimeout := time.Duration(rCfg.ReadTimeout) * time.Second
|
||||
writeTimeout := time.Duration(rCfg.WriteTimeout) * time.Second
|
||||
|
||||
opts := []redigo.DialOption{
|
||||
redigo.DialDatabase(rCfg.DB),
|
||||
redigo.DialPassword(rCfg.Password),
|
||||
redigo.DialConnectTimeout(timeout),
|
||||
// redigo.DialReadTimeout(timeout),
|
||||
// redigo.DialWriteTimeout(timeout),
|
||||
// TODO add redigo.DialUsername when redis open acl
|
||||
// redis.DialUsername("username"),
|
||||
redigo.DialConnectTimeout(dialTimeout),
|
||||
redigo.DialReadTimeout(readTimeout),
|
||||
redigo.DialWriteTimeout(writeTimeout),
|
||||
}
|
||||
|
||||
c, err := redigo.Dial("tcp", rCfg.Addr, opts...)
|
||||
|
|
@ -72,13 +91,14 @@ func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
|
|||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if conn.Err() != nil {
|
||||
return nil, fmt.Errorf("failed to get connection from pool: %w", conn.Err())
|
||||
if err := conn.Err(); err != nil {
|
||||
return nil, fmt.Errorf("failed to get connection from pool: %w", err)
|
||||
}
|
||||
|
||||
_, err := conn.Do("PING")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("redis connection test (PING) failed: %w", err)
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,56 +5,79 @@ import (
|
|||
"errors"
|
||||
"time"
|
||||
|
||||
"modelRT/constants"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type RedisOptions struct {
|
||||
redisOptions *redis.Options
|
||||
timeout time.Duration
|
||||
type clientConfig struct {
|
||||
*redis.Options
|
||||
}
|
||||
|
||||
type RedisOption func(*RedisOptions) error
|
||||
type Option func(*clientConfig) error
|
||||
|
||||
// WithPassword define func of configure redis password options
|
||||
func WithPassword(password string) RedisOption {
|
||||
return func(o *RedisOptions) error {
|
||||
if password == "" {
|
||||
func WithPassword(password string, env string) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if env == constants.ProductionDeployMode && password == "" {
|
||||
return errors.New("password is empty")
|
||||
}
|
||||
o.redisOptions.Password = password
|
||||
c.Password = password
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimeout define func of configure redis timeout options
|
||||
func WithTimeout(timeout time.Duration) RedisOption {
|
||||
return func(o *RedisOptions) error {
|
||||
// WithConnectTimeout define func of configure redis connect timeout options
|
||||
func WithConnectTimeout(timeout time.Duration) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if timeout < 0 {
|
||||
return errors.New("timeout can not be negative")
|
||||
}
|
||||
o.timeout = timeout
|
||||
c.DialTimeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithReadTimeout define func of configure redis read timeout options
|
||||
func WithReadTimeout(timeout time.Duration) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if timeout < 0 {
|
||||
return errors.New("timeout can not be negative")
|
||||
}
|
||||
c.ReadTimeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithWriteTimeout define func of configure redis write timeout options
|
||||
func WithWriteTimeout(timeout time.Duration) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if timeout < 0 {
|
||||
return errors.New("timeout can not be negative")
|
||||
}
|
||||
c.WriteTimeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithDB define func of configure redis db options
|
||||
func WithDB(db int) RedisOption {
|
||||
return func(o *RedisOptions) error {
|
||||
func WithDB(db int) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if db < 0 {
|
||||
return errors.New("db can not be negative")
|
||||
}
|
||||
o.redisOptions.DB = db
|
||||
c.DB = db
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPoolSize define func of configure pool size options
|
||||
func WithPoolSize(poolSize int) RedisOption {
|
||||
return func(o *RedisOptions) error {
|
||||
func WithPoolSize(poolSize int) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if poolSize <= 0 {
|
||||
return errors.New("pool size must be greater than 0")
|
||||
}
|
||||
o.redisOptions.PoolSize = poolSize
|
||||
c.PoolSize = poolSize
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue