Compare commits
No commits in common. "feature/bay-realtime-data-calc" and "develop" have entirely different histories.
feature/ba
...
develop
|
|
@ -27,16 +27,3 @@ go.work
|
||||||
/log/
|
/log/
|
||||||
# Shield config files in the configs folder
|
# Shield config files in the configs folder
|
||||||
/configs/**/*.yaml
|
/configs/**/*.yaml
|
||||||
/configs/**/*.pem
|
|
||||||
|
|
||||||
# ai config
|
|
||||||
.cursor/
|
|
||||||
.claude/
|
|
||||||
.cursorrules
|
|
||||||
.copilot/
|
|
||||||
.chatgpt/
|
|
||||||
.ai_history/
|
|
||||||
.vector_cache/
|
|
||||||
ai-debug.log
|
|
||||||
*.patch
|
|
||||||
*.diff
|
|
||||||
|
|
@ -16,12 +16,6 @@ var (
|
||||||
|
|
||||||
// ErrFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
|
// ErrFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
|
||||||
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
|
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
|
||||||
// ErrSubTargetRepeat define variable to indicates subscription target already exist in list
|
|
||||||
ErrSubTargetRepeat = newError(40005, "subscription target already exist in list")
|
|
||||||
// ErrSubTargetNotFound define variable to indicates can not find measurement by subscription target
|
|
||||||
ErrSubTargetNotFound = newError(40006, "found measuremnet by subscription target failed")
|
|
||||||
// ErrCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
|
|
||||||
ErrCancelSubTargetMissing = newError(40007, "cancel a not exist subscription target")
|
|
||||||
|
|
||||||
// ErrDBQueryFailed define variable to represents a generic failure during a PostgreSQL SELECT or SCAN operation.
|
// ErrDBQueryFailed define variable to represents a generic failure during a PostgreSQL SELECT or SCAN operation.
|
||||||
ErrDBQueryFailed = newError(50001, "query postgres database data failed")
|
ErrDBQueryFailed = newError(50001, "query postgres database data failed")
|
||||||
|
|
|
||||||
|
|
@ -1,10 +0,0 @@
|
||||||
// Package common define common error variables
|
|
||||||
package common
|
|
||||||
|
|
||||||
import "errors"
|
|
||||||
|
|
||||||
// ErrUnknowEventActionCommand define error of unknown event action command
|
|
||||||
var ErrUnknowEventActionCommand = errors.New("unknown action command")
|
|
||||||
|
|
||||||
// ErrExecEventActionFailed define error of execute event action failed
|
|
||||||
var ErrExecEventActionFailed = errors.New("exec event action func failed")
|
|
||||||
|
|
@ -44,11 +44,10 @@ var baseCurrentFunc = func(archorValue float64, args ...float64) float64 {
|
||||||
// SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data
|
// SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data
|
||||||
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float64, args ...float64) float64, []float64) {
|
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float64, args ...float64) float64, []float64) {
|
||||||
if componentType == constants.DemoType {
|
if componentType == constants.DemoType {
|
||||||
switch anchorName {
|
if anchorName == "voltage" {
|
||||||
case "voltage":
|
|
||||||
resistance := componentData["resistance"].(float64)
|
resistance := componentData["resistance"].(float64)
|
||||||
return baseVoltageFunc, []float64{resistance}
|
return baseVoltageFunc, []float64{resistance}
|
||||||
case "current":
|
} else if anchorName == "current" {
|
||||||
resistance := componentData["resistance"].(float64)
|
resistance := componentData["resistance"].(float64)
|
||||||
return baseCurrentFunc, []float64{resistance}
|
return baseCurrentFunc, []float64{resistance}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,21 +19,6 @@ type ServiceConfig struct {
|
||||||
ServiceAddr string `mapstructure:"service_addr"`
|
ServiceAddr string `mapstructure:"service_addr"`
|
||||||
ServiceName string `mapstructure:"service_name"`
|
ServiceName string `mapstructure:"service_name"`
|
||||||
SecretKey string `mapstructure:"secret_key"`
|
SecretKey string `mapstructure:"secret_key"`
|
||||||
DeployEnv string `mapstructure:"deploy_env"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// RabbitMQConfig define config struct of RabbitMQ config
|
|
||||||
type RabbitMQConfig struct {
|
|
||||||
CACertPath string `mapstructure:"ca_cert_path"`
|
|
||||||
ClientKeyPath string `mapstructure:"client_key_path"`
|
|
||||||
ClientKeyPassword string `mapstructure:"client_key_password"`
|
|
||||||
ClientCertPath string `mapstructure:"client_cert_path"`
|
|
||||||
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
|
|
||||||
ServerName string `mapstructure:"server_name"`
|
|
||||||
User string `mapstructure:"user"`
|
|
||||||
Password string `mapstructure:"password"`
|
|
||||||
Host string `mapstructure:"host"`
|
|
||||||
Port int `mapstructure:"port"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// KafkaConfig define config struct of kafka config
|
// KafkaConfig define config struct of kafka config
|
||||||
|
|
@ -72,9 +57,7 @@ type RedisConfig struct {
|
||||||
Password string `mapstructure:"password"`
|
Password string `mapstructure:"password"`
|
||||||
DB int `mapstructure:"db"`
|
DB int `mapstructure:"db"`
|
||||||
PoolSize int `mapstructure:"poolsize"`
|
PoolSize int `mapstructure:"poolsize"`
|
||||||
DialTimeout int `mapstructure:"dial_timeout"`
|
Timeout int `mapstructure:"timeout"`
|
||||||
ReadTimeout int `mapstructure:"read_timeout"`
|
|
||||||
WriteTimeout int `mapstructure:"write_timeout"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AntsConfig define config struct of ants pool config
|
// AntsConfig define config struct of ants pool config
|
||||||
|
|
@ -96,7 +79,6 @@ type ModelRTConfig struct {
|
||||||
BaseConfig `mapstructure:"base"`
|
BaseConfig `mapstructure:"base"`
|
||||||
ServiceConfig `mapstructure:"service"`
|
ServiceConfig `mapstructure:"service"`
|
||||||
PostgresConfig `mapstructure:"postgres"`
|
PostgresConfig `mapstructure:"postgres"`
|
||||||
RabbitMQConfig `mapstructure:"rabbitmq"`
|
|
||||||
KafkaConfig `mapstructure:"kafka"`
|
KafkaConfig `mapstructure:"kafka"`
|
||||||
LoggerConfig `mapstructure:"logger"`
|
LoggerConfig `mapstructure:"logger"`
|
||||||
AntsConfig `mapstructure:"ants"`
|
AntsConfig `mapstructure:"ants"`
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
// Package constants define constant variable
|
||||||
|
package constants
|
||||||
|
|
||||||
|
const (
|
||||||
|
// CodeSuccess define constant to indicates that the API was successfully processed
|
||||||
|
CodeSuccess = 20000
|
||||||
|
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
|
||||||
|
CodeInvalidParamFailed = 40001
|
||||||
|
// CodeDBQueryFailed define constant to indicates database query operation failed
|
||||||
|
CodeDBQueryFailed = 50001
|
||||||
|
// CodeDBUpdateailed define constant to indicates database update operation failed
|
||||||
|
CodeDBUpdateailed = 50002
|
||||||
|
// CodeRedisQueryFailed define constant to indicates redis query operation failed
|
||||||
|
CodeRedisQueryFailed = 60001
|
||||||
|
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
|
||||||
|
CodeRedisUpdateFailed = 60002
|
||||||
|
)
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
// Package constants define constant variable
|
|
||||||
package constants
|
|
||||||
|
|
||||||
const (
|
|
||||||
// CodeSuccess define constant to indicates that the API was successfully processed
|
|
||||||
CodeSuccess = 20000
|
|
||||||
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
|
|
||||||
CodeInvalidParamFailed = 40001
|
|
||||||
// CodeFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
|
|
||||||
CodeFoundTargetFailed = 40004
|
|
||||||
// CodeSubTargetRepeat define variable to indicates subscription target already exist in list
|
|
||||||
CodeSubTargetRepeat = 40005
|
|
||||||
// CodeSubTargetNotFound define variable to indicates can not find measurement by subscription target
|
|
||||||
CodeSubTargetNotFound = 40006
|
|
||||||
// CodeCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
|
|
||||||
CodeCancelSubTargetMissing = 40007
|
|
||||||
// CodeUpdateSubTargetMissing define variable to indicates update a not exist subscription target
|
|
||||||
CodeUpdateSubTargetMissing = 40008
|
|
||||||
// CodeAppendSubTargetMissing define variable to indicates append a not exist subscription target
|
|
||||||
CodeAppendSubTargetMissing = 40009
|
|
||||||
// CodeUnsupportSubOperation define variable to indicates append a not exist subscription target
|
|
||||||
CodeUnsupportSubOperation = 40010
|
|
||||||
// CodeDBQueryFailed define constant to indicates database query operation failed
|
|
||||||
CodeDBQueryFailed = 50001
|
|
||||||
// CodeDBUpdateailed define constant to indicates database update operation failed
|
|
||||||
CodeDBUpdateailed = 50002
|
|
||||||
// CodeRedisQueryFailed define constant to indicates redis query operation failed
|
|
||||||
CodeRedisQueryFailed = 60001
|
|
||||||
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
|
|
||||||
CodeRedisUpdateFailed = 60002
|
|
||||||
)
|
|
||||||
|
|
@ -1,11 +0,0 @@
|
||||||
// Package constants define constant variable
|
|
||||||
package constants
|
|
||||||
|
|
||||||
const (
|
|
||||||
// DevelopmentDeployMode define development operator environment for modelRT project
|
|
||||||
DevelopmentDeployMode = "development"
|
|
||||||
// DebugDeployMode define debug operator environment for modelRT project
|
|
||||||
DebugDeployMode = "debug"
|
|
||||||
// ProductionDeployMode define production operator environment for modelRT project
|
|
||||||
ProductionDeployMode = "production"
|
|
||||||
)
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
// Package common define common error variables
|
// Package constants define constant variable
|
||||||
package common
|
package constants
|
||||||
|
|
||||||
import "errors"
|
import "errors"
|
||||||
|
|
||||||
|
|
@ -1,92 +1,31 @@
|
||||||
// Package constants define constant variable
|
// Package constants define constant variable
|
||||||
package constants
|
package constants
|
||||||
|
|
||||||
// EvenvtType define event type
|
|
||||||
type EvenvtType int
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// EventGeneralHard define gereral hard event type
|
// TIBreachTriggerType define out of bounds type constant
|
||||||
EventGeneralHard EvenvtType = iota
|
TIBreachTriggerType = "trigger"
|
||||||
// EventGeneralPlatformSoft define gereral platform soft event type
|
|
||||||
EventGeneralPlatformSoft
|
|
||||||
// EventGeneralApplicationSoft define gereral application soft event type
|
|
||||||
EventGeneralApplicationSoft
|
|
||||||
// EventWarnHard define warn hard event type
|
|
||||||
EventWarnHard
|
|
||||||
// EventWarnPlatformSoft define warn platform soft event type
|
|
||||||
EventWarnPlatformSoft
|
|
||||||
// EventWarnApplicationSoft define warn application soft event type
|
|
||||||
EventWarnApplicationSoft
|
|
||||||
// EventCriticalHard define critical hard event type
|
|
||||||
EventCriticalHard
|
|
||||||
// EventCriticalPlatformSoft define critical platform soft event type
|
|
||||||
EventCriticalPlatformSoft
|
|
||||||
// EventCriticalApplicationSoft define critical application soft event type
|
|
||||||
EventCriticalApplicationSoft
|
|
||||||
)
|
|
||||||
|
|
||||||
// IsGeneral define fucn to check event type is general
|
|
||||||
func IsGeneral(eventType EvenvtType) bool {
|
|
||||||
return eventType < 3
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsWarning define fucn to check event type is warn
|
|
||||||
func IsWarning(eventType EvenvtType) bool {
|
|
||||||
return eventType >= 3 && eventType <= 5
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsCritical define fucn to check event type is critical
|
|
||||||
func IsCritical(eventType EvenvtType) bool {
|
|
||||||
return eventType >= 6
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
// EventFromStation define event from station type
|
|
||||||
EventFromStation = "station"
|
|
||||||
// EventFromPlatform define event from platform type
|
|
||||||
EventFromPlatform = "platform"
|
|
||||||
// EventFromOthers define event from others type
|
|
||||||
EventFromOthers = "others"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// EventStatusHappended define status for event record when event just happened, no data attached yet
|
// TelemetryUpLimit define telemetry upper limit
|
||||||
EventStatusHappended = iota
|
TelemetryUpLimit = "up"
|
||||||
// EventStatusDataAttached define status for event record when event just happened, data attached already
|
// TelemetryUpUpLimit define telemetry upper upper limit
|
||||||
EventStatusDataAttached
|
TelemetryUpUpLimit = "upup"
|
||||||
// EventStatusReported define status for event record when event reported to CIM, no matter it's successful or failed
|
|
||||||
EventStatusReported
|
// TelemetryDownLimit define telemetry limit
|
||||||
// EventStatusConfirmed define status for event record when event confirmed by CIM, no matter it's successful or failed
|
TelemetryDownLimit = "down"
|
||||||
EventStatusConfirmed
|
// TelemetryDownDownLimit define telemetry lower lower limit
|
||||||
// EventStatusPersisted define status for event record when event persisted in database, no matter it's successful or failed
|
TelemetryDownDownLimit = "downdown"
|
||||||
EventStatusPersisted
|
|
||||||
// EventStatusClosed define status for event record when event closed, no matter it's successful or failed
|
|
||||||
EventStatusClosed
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// EventExchangeName define exchange name for event alarm message
|
// TelesignalRaising define telesignal raising edge
|
||||||
EventExchangeName = "event-exchange"
|
TelesignalRaising = "raising"
|
||||||
// EventDeadExchangeName define dead letter exchange name for event alarm message
|
// TelesignalFalling define telesignal falling edge
|
||||||
EventDeadExchangeName = "event-dead-letter-exchange"
|
TelesignalFalling = "falling"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// EventUpDownRoutingKey define routing key for up or down limit event alarm message
|
// MinBreachCount define min breach count of real time data
|
||||||
EventUpDownRoutingKey = "event.#"
|
MinBreachCount = 10
|
||||||
// EventUpDownDeadRoutingKey define dead letter routing key for up or down limit event alarm message
|
|
||||||
EventUpDownDeadRoutingKey = "event.#"
|
|
||||||
// EventUpDownQueueName define queue name for up or down limit event alarm message
|
|
||||||
EventUpDownQueueName = "event-up-down-queue"
|
|
||||||
// EventUpDownDeadQueueName define dead letter queue name for event alarm message
|
|
||||||
EventUpDownDeadQueueName = "event-dead-letter-queue"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// EventGeneralUpDownLimitCategroy define category for general up and down limit event
|
|
||||||
EventGeneralUpDownLimitCategroy = "event.general.updown.limit"
|
|
||||||
// EventWarnUpDownLimitCategroy define category for warn up and down limit event
|
|
||||||
EventWarnUpDownLimitCategroy = "event.warn.updown.limit"
|
|
||||||
// EventCriticalUpDownLimitCategroy define category for critical up and down limit event
|
|
||||||
EventCriticalUpDownLimitCategroy = "event.critical.updown.limit"
|
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,29 @@ const (
|
||||||
SubUpdateAction string = "update"
|
SubUpdateAction string = "update"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// 定义状态常量
|
||||||
|
// TODO 从4位格式修改为5位格式
|
||||||
|
const (
|
||||||
|
// SubSuccessCode define subscription success code
|
||||||
|
SubSuccessCode = "1001"
|
||||||
|
// SubFailedCode define subscription failed code
|
||||||
|
SubFailedCode = "1002"
|
||||||
|
// RTDSuccessCode define real time data return success code
|
||||||
|
RTDSuccessCode = "1003"
|
||||||
|
// RTDFailedCode define real time data return failed code
|
||||||
|
RTDFailedCode = "1004"
|
||||||
|
// CancelSubSuccessCode define cancel subscription success code
|
||||||
|
CancelSubSuccessCode = "1005"
|
||||||
|
// CancelSubFailedCode define cancel subscription failed code
|
||||||
|
CancelSubFailedCode = "1006"
|
||||||
|
// SubRepeatCode define subscription repeat code
|
||||||
|
SubRepeatCode = "1007"
|
||||||
|
// UpdateSubSuccessCode define update subscription success code
|
||||||
|
UpdateSubSuccessCode = "1008"
|
||||||
|
// UpdateSubFailedCode define update subscription failed code
|
||||||
|
UpdateSubFailedCode = "1009"
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine
|
// SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine
|
||||||
SysCtrlPrefix = "SYS_CTRL_"
|
SysCtrlPrefix = "SYS_CTRL_"
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
// Package constants define constant variable
|
|
||||||
package constants
|
|
||||||
|
|
||||||
const (
|
|
||||||
// TIBreachTriggerType define out of bounds type constant
|
|
||||||
TIBreachTriggerType = "trigger"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// TelemetryUpLimit define telemetry upper limit
|
|
||||||
TelemetryUpLimit = "up"
|
|
||||||
// TelemetryUpUpLimit define telemetry upper upper limit
|
|
||||||
TelemetryUpUpLimit = "upup"
|
|
||||||
|
|
||||||
// TelemetryDownLimit define telemetry limit
|
|
||||||
TelemetryDownLimit = "down"
|
|
||||||
// TelemetryDownDownLimit define telemetry lower lower limit
|
|
||||||
TelemetryDownDownLimit = "downdown"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// TelesignalRaising define telesignal raising edge
|
|
||||||
TelesignalRaising = "raising"
|
|
||||||
// TelesignalFalling define telesignal falling edge
|
|
||||||
TelesignalFalling = "falling"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// MinBreachCount define min breach count of real time data
|
|
||||||
MinBreachCount = 10
|
|
||||||
)
|
|
||||||
|
|
@ -1,321 +0,0 @@
|
||||||
// Package database define database operation functions
|
|
||||||
package database
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"modelRT/orm"
|
|
||||||
|
|
||||||
"github.com/gofrs/uuid"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
"gorm.io/gorm/clause"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CreateAsyncTask creates a new async task in the database
|
|
||||||
func CreateAsyncTask(ctx context.Context, tx *gorm.DB, taskType orm.AsyncTaskType, params orm.JSONMap) (*orm.AsyncTask, error) {
|
|
||||||
taskID, err := uuid.NewV4()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
task := &orm.AsyncTask{
|
|
||||||
TaskID: taskID,
|
|
||||||
TaskType: taskType,
|
|
||||||
Status: orm.AsyncTaskStatusSubmitted,
|
|
||||||
Params: params,
|
|
||||||
CreatedAt: time.Now().Unix(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).Create(task)
|
|
||||||
if result.Error != nil {
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return task, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAsyncTaskByID retrieves an async task by its ID
|
|
||||||
func GetAsyncTaskByID(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTask, error) {
|
|
||||||
var task orm.AsyncTask
|
|
||||||
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
|
||||||
First(&task)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return &task, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAsyncTasksByIDs retrieves multiple async tasks by their IDs
|
|
||||||
func GetAsyncTasksByIDs(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTask, error) {
|
|
||||||
var tasks []orm.AsyncTask
|
|
||||||
|
|
||||||
if len(taskIDs) == 0 {
|
|
||||||
return tasks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Where("task_id IN ?", taskIDs).
|
|
||||||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
|
||||||
Find(&tasks)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return tasks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateAsyncTaskStatus updates the status of an async task
|
|
||||||
func UpdateAsyncTaskStatus(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, status orm.AsyncTaskStatus) error {
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Model(&orm.AsyncTask{}).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
Update("status", status)
|
|
||||||
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateAsyncTaskProgress updates the progress of an async task
|
|
||||||
func UpdateAsyncTaskProgress(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, progress int) error {
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Model(&orm.AsyncTask{}).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
Update("progress", progress)
|
|
||||||
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// CompleteAsyncTask marks an async task as completed with timestamp
|
|
||||||
func CompleteAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error {
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Model(&orm.AsyncTask{}).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
Updates(map[string]any{
|
|
||||||
"status": orm.AsyncTaskStatusCompleted,
|
|
||||||
"finished_at": timestamp,
|
|
||||||
"progress": 100,
|
|
||||||
})
|
|
||||||
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// FailAsyncTask marks an async task as failed with timestamp
|
|
||||||
func FailAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error {
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Model(&orm.AsyncTask{}).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
Updates(map[string]any{
|
|
||||||
"status": orm.AsyncTaskStatusFailed,
|
|
||||||
"finished_at": timestamp,
|
|
||||||
})
|
|
||||||
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateAsyncTaskResult creates a result record for an async task
|
|
||||||
func CreateAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error {
|
|
||||||
taskResult := &orm.AsyncTaskResult{
|
|
||||||
TaskID: taskID,
|
|
||||||
Result: result,
|
|
||||||
}
|
|
||||||
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
resultOp := tx.WithContext(cancelCtx).Create(taskResult)
|
|
||||||
return resultOp.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateAsyncTaskResultWithError updates a task result with error information
|
|
||||||
func UpdateAsyncTaskResultWithError(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, code int, message string, detail orm.JSONMap) error {
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Update with error information
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Model(&orm.AsyncTaskResult{}).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
Updates(map[string]any{
|
|
||||||
"error_code": code,
|
|
||||||
"error_message": message,
|
|
||||||
"error_detail": detail,
|
|
||||||
"result": nil,
|
|
||||||
})
|
|
||||||
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateAsyncTaskResultWithSuccess updates a task result with success information
|
|
||||||
func UpdateAsyncTaskResultWithSuccess(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error {
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// First try to update existing record, if not found create new one
|
|
||||||
existingResult := tx.WithContext(cancelCtx).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
FirstOrCreate(&orm.AsyncTaskResult{TaskID: taskID})
|
|
||||||
|
|
||||||
if existingResult.Error != nil {
|
|
||||||
return existingResult.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update with success information
|
|
||||||
updateResult := tx.WithContext(cancelCtx).
|
|
||||||
Model(&orm.AsyncTaskResult{}).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
Updates(map[string]any{
|
|
||||||
"result": result,
|
|
||||||
"error_code": nil,
|
|
||||||
"error_message": nil,
|
|
||||||
"error_detail": nil,
|
|
||||||
})
|
|
||||||
|
|
||||||
return updateResult.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAsyncTaskResult retrieves the result of an async task
|
|
||||||
func GetAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTaskResult, error) {
|
|
||||||
var taskResult orm.AsyncTaskResult
|
|
||||||
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Where("task_id = ?", taskID).
|
|
||||||
First(&taskResult)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
if result.Error == gorm.ErrRecordNotFound {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return &taskResult, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAsyncTaskResults retrieves multiple task results by task IDs
|
|
||||||
func GetAsyncTaskResults(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTaskResult, error) {
|
|
||||||
var taskResults []orm.AsyncTaskResult
|
|
||||||
|
|
||||||
if len(taskIDs) == 0 {
|
|
||||||
return taskResults, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Where("task_id IN ?", taskIDs).
|
|
||||||
Find(&taskResults)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return taskResults, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPendingTasks retrieves pending tasks (submitted but not yet running/completed)
|
|
||||||
func GetPendingTasks(ctx context.Context, tx *gorm.DB, limit int) ([]orm.AsyncTask, error) {
|
|
||||||
var tasks []orm.AsyncTask
|
|
||||||
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Where("status = ?", orm.AsyncTaskStatusSubmitted).
|
|
||||||
Order("created_at ASC").
|
|
||||||
Limit(limit).
|
|
||||||
Find(&tasks)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return tasks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetTasksByStatus retrieves tasks by status
|
|
||||||
func GetTasksByStatus(ctx context.Context, tx *gorm.DB, status orm.AsyncTaskStatus, limit int) ([]orm.AsyncTask, error) {
|
|
||||||
var tasks []orm.AsyncTask
|
|
||||||
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Where("status = ?", status).
|
|
||||||
Order("created_at ASC").
|
|
||||||
Limit(limit).
|
|
||||||
Find(&tasks)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return tasks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteOldTasks deletes tasks older than the specified timestamp
|
|
||||||
func DeleteOldTasks(ctx context.Context, tx *gorm.DB, olderThan int64) error {
|
|
||||||
// ctx timeout judgment
|
|
||||||
cancelCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// First delete task results
|
|
||||||
result := tx.WithContext(cancelCtx).
|
|
||||||
Where("task_id IN (SELECT task_id FROM async_task WHERE created_at < ?)", olderThan).
|
|
||||||
Delete(&orm.AsyncTaskResult{})
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then delete tasks
|
|
||||||
result = tx.WithContext(cancelCtx).
|
|
||||||
Where("created_at < ?", olderThan).
|
|
||||||
Delete(&orm.AsyncTask{})
|
|
||||||
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
@ -53,8 +53,7 @@ func FillingLongTokenModel(ctx context.Context, tx *gorm.DB, identModel *model.L
|
||||||
func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken string) (model.IndentityTokenModelInterface, error) {
|
func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken string) (model.IndentityTokenModelInterface, error) {
|
||||||
identSlice := strings.Split(identToken, ".")
|
identSlice := strings.Split(identToken, ".")
|
||||||
identSliceLen := len(identSlice)
|
identSliceLen := len(identSlice)
|
||||||
switch identSliceLen {
|
if identSliceLen == 4 {
|
||||||
case 4:
|
|
||||||
// token1.token2.token3.token4.token7
|
// token1.token2.token3.token4.token7
|
||||||
shortIndentModel := &model.ShortIdentityTokenModel{
|
shortIndentModel := &model.ShortIdentityTokenModel{
|
||||||
GridTag: identSlice[0],
|
GridTag: identSlice[0],
|
||||||
|
|
@ -68,7 +67,7 @@ func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken strin
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return shortIndentModel, nil
|
return shortIndentModel, nil
|
||||||
case 7:
|
} else if identSliceLen == 7 {
|
||||||
// token1.token2.token3.token4.token5.token6.token7
|
// token1.token2.token3.token4.token5.token6.token7
|
||||||
longIndentModel := &model.LongIdentityTokenModel{
|
longIndentModel := &model.LongIdentityTokenModel{
|
||||||
GridTag: identSlice[0],
|
GridTag: identSlice[0],
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,7 @@ func ParseAttrToken(ctx context.Context, tx *gorm.DB, attrToken, clientToken str
|
||||||
|
|
||||||
attrSlice := strings.Split(attrToken, ".")
|
attrSlice := strings.Split(attrToken, ".")
|
||||||
attrLen := len(attrSlice)
|
attrLen := len(attrSlice)
|
||||||
switch attrLen {
|
if attrLen == 4 {
|
||||||
case 4:
|
|
||||||
short := &model.ShortAttrInfo{
|
short := &model.ShortAttrInfo{
|
||||||
AttrGroupName: attrSlice[2],
|
AttrGroupName: attrSlice[2],
|
||||||
AttrKey: attrSlice[3],
|
AttrKey: attrSlice[3],
|
||||||
|
|
@ -36,7 +35,7 @@ func ParseAttrToken(ctx context.Context, tx *gorm.DB, attrToken, clientToken str
|
||||||
}
|
}
|
||||||
short.AttrValue = attrValue
|
short.AttrValue = attrValue
|
||||||
return short, nil
|
return short, nil
|
||||||
case 7:
|
} else if attrLen == 7 {
|
||||||
long := &model.LongAttrInfo{
|
long := &model.LongAttrInfo{
|
||||||
AttrGroupName: attrSlice[5],
|
AttrGroupName: attrSlice[5],
|
||||||
AttrKey: attrSlice[6],
|
AttrKey: attrSlice[6],
|
||||||
|
|
|
||||||
|
|
@ -2,10 +2,11 @@
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/orm"
|
|
||||||
|
|
||||||
"gorm.io/driver/postgres"
|
"gorm.io/driver/postgres"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
|
@ -14,36 +15,32 @@ import (
|
||||||
var (
|
var (
|
||||||
postgresOnce sync.Once
|
postgresOnce sync.Once
|
||||||
_globalPostgresClient *gorm.DB
|
_globalPostgresClient *gorm.DB
|
||||||
|
_globalPostgresMu sync.RWMutex
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetPostgresDBClient returns the global PostgresDB client.It's safe for concurrent use.
|
// GetPostgresDBClient returns the global PostgresDB client.It's safe for concurrent use.
|
||||||
func GetPostgresDBClient() *gorm.DB {
|
func GetPostgresDBClient() *gorm.DB {
|
||||||
return _globalPostgresClient
|
_globalPostgresMu.RLock()
|
||||||
|
client := _globalPostgresClient
|
||||||
|
_globalPostgresMu.RUnlock()
|
||||||
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitPostgresDBInstance return instance of PostgresDB client
|
// InitPostgresDBInstance return instance of PostgresDB client
|
||||||
func InitPostgresDBInstance(PostgresDBURI string) *gorm.DB {
|
func InitPostgresDBInstance(ctx context.Context, PostgresDBURI string) *gorm.DB {
|
||||||
postgresOnce.Do(func() {
|
postgresOnce.Do(func() {
|
||||||
_globalPostgresClient = initPostgresDBClient(PostgresDBURI)
|
_globalPostgresClient = initPostgresDBClient(ctx, PostgresDBURI)
|
||||||
})
|
})
|
||||||
return _globalPostgresClient
|
return _globalPostgresClient
|
||||||
}
|
}
|
||||||
|
|
||||||
// initPostgresDBClient return successfully initialized PostgresDB client
|
// initPostgresDBClient return successfully initialized PostgresDB client
|
||||||
func initPostgresDBClient(PostgresDBURI string) *gorm.DB {
|
func initPostgresDBClient(ctx context.Context, PostgresDBURI string) *gorm.DB {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
db, err := gorm.Open(postgres.Open(PostgresDBURI), &gorm.Config{Logger: logger.NewGormLogger()})
|
db, err := gorm.Open(postgres.Open(PostgresDBURI), &gorm.Config{Logger: logger.NewGormLogger()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Auto migrate async task tables
|
|
||||||
err = db.AutoMigrate(
|
|
||||||
&orm.AsyncTask{},
|
|
||||||
&orm.AsyncTaskResult{},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,8 @@ func NewRedisClient() *RedisClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryByZRange define func to query real time data from redis zset
|
// QueryByZRangeByLex define func to query real time data from redis zset
|
||||||
func (rc *RedisClient) QueryByZRange(ctx context.Context, key string, size int64) ([]redis.Z, error) {
|
func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64) ([]redis.Z, error) {
|
||||||
client := rc.Client
|
client := rc.Client
|
||||||
args := redis.ZRangeArgs{
|
args := redis.ZRangeArgs{
|
||||||
Key: key,
|
Key: key,
|
||||||
|
|
|
||||||
|
|
@ -16,15 +16,13 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// initClient define func of return successfully initialized redis client
|
// initClient define func of return successfully initialized redis client
|
||||||
func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
func initClient(rCfg config.RedisConfig) *redis.Client {
|
||||||
client, err := util.NewRedisClient(
|
client, err := util.NewRedisClient(
|
||||||
rCfg.Addr,
|
rCfg.Addr,
|
||||||
util.WithPassword(rCfg.Password, deployEnv),
|
util.WithPassword(rCfg.Password),
|
||||||
util.WithDB(rCfg.DB),
|
util.WithDB(rCfg.DB),
|
||||||
util.WithPoolSize(rCfg.PoolSize),
|
util.WithPoolSize(rCfg.PoolSize),
|
||||||
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
|
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
|
||||||
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
|
|
||||||
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
@ -33,9 +31,9 @@ func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitRedisClientInstance define func of return instance of redis client
|
// InitRedisClientInstance define func of return instance of redis client
|
||||||
func InitRedisClientInstance(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
func InitRedisClientInstance(rCfg config.RedisConfig) *redis.Client {
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
_globalStorageClient = initClient(rCfg, deployEnv)
|
_globalStorageClient = initClient(rCfg)
|
||||||
})
|
})
|
||||||
return _globalStorageClient
|
return _globalStorageClient
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,15 +16,13 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// initClient define func of return successfully initialized redis client
|
// initClient define func of return successfully initialized redis client
|
||||||
func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
func initClient(rCfg config.RedisConfig) *redis.Client {
|
||||||
client, err := util.NewRedisClient(
|
client, err := util.NewRedisClient(
|
||||||
rCfg.Addr,
|
rCfg.Addr,
|
||||||
util.WithPassword(rCfg.Password, deployEnv),
|
util.WithPassword(rCfg.Password),
|
||||||
util.WithDB(rCfg.DB),
|
util.WithDB(rCfg.DB),
|
||||||
util.WithPoolSize(rCfg.PoolSize),
|
util.WithPoolSize(rCfg.PoolSize),
|
||||||
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
|
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
|
||||||
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
|
|
||||||
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
@ -33,9 +31,9 @@ func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitClientInstance define func of return instance of redis client
|
// InitClientInstance define func of return instance of redis client
|
||||||
func InitClientInstance(rCfg config.RedisConfig, deployEnv string) *redis.Client {
|
func InitClientInstance(rCfg config.RedisConfig) *redis.Client {
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
_globalLockerClient = initClient(rCfg, deployEnv)
|
_globalLockerClient = initClient(rCfg)
|
||||||
})
|
})
|
||||||
return _globalLockerClient
|
return _globalLockerClient
|
||||||
}
|
}
|
||||||
|
|
|
||||||
4
go.mod
4
go.mod
|
|
@ -13,15 +13,14 @@ require (
|
||||||
github.com/json-iterator/go v1.1.12
|
github.com/json-iterator/go v1.1.12
|
||||||
github.com/natefinch/lumberjack v2.0.0+incompatible
|
github.com/natefinch/lumberjack v2.0.0+incompatible
|
||||||
github.com/panjf2000/ants/v2 v2.10.0
|
github.com/panjf2000/ants/v2 v2.10.0
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0
|
|
||||||
github.com/redis/go-redis/v9 v9.7.3
|
github.com/redis/go-redis/v9 v9.7.3
|
||||||
github.com/spf13/viper v1.19.0
|
github.com/spf13/viper v1.19.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
github.com/swaggo/files v1.0.1
|
github.com/swaggo/files v1.0.1
|
||||||
github.com/swaggo/gin-swagger v1.6.0
|
github.com/swaggo/gin-swagger v1.6.0
|
||||||
github.com/swaggo/swag v1.16.4
|
github.com/swaggo/swag v1.16.4
|
||||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
|
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
|
golang.org/x/sys v0.28.0
|
||||||
gorm.io/driver/mysql v1.5.7
|
gorm.io/driver/mysql v1.5.7
|
||||||
gorm.io/driver/postgres v1.5.9
|
gorm.io/driver/postgres v1.5.9
|
||||||
gorm.io/gorm v1.25.12
|
gorm.io/gorm v1.25.12
|
||||||
|
|
@ -82,7 +81,6 @@ require (
|
||||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
|
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
|
||||||
golang.org/x/net v0.32.0 // indirect
|
golang.org/x/net v0.32.0 // indirect
|
||||||
golang.org/x/sync v0.10.0 // indirect
|
golang.org/x/sync v0.10.0 // indirect
|
||||||
golang.org/x/sys v0.28.0 // indirect
|
|
||||||
golang.org/x/text v0.21.0 // indirect
|
golang.org/x/text v0.21.0 // indirect
|
||||||
golang.org/x/tools v0.28.0 // indirect
|
golang.org/x/tools v0.28.0 // indirect
|
||||||
google.golang.org/protobuf v1.35.2 // indirect
|
google.golang.org/protobuf v1.35.2 // indirect
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -121,8 +121,6 @@ github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xl
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
|
||||||
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
|
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
|
||||||
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
|
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
|
||||||
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
|
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
|
||||||
|
|
@ -164,8 +162,6 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
|
||||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||||
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
|
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
|
||||||
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
|
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
|
||||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
|
|
||||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
|
|
||||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,10 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"modelRT/alert"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
"modelRT/real-time-data/alert"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,677 +0,0 @@
|
||||||
// Package handler provides HTTP handlers for various endpoints.
|
|
||||||
package handler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"modelRT/database"
|
|
||||||
"modelRT/logger"
|
|
||||||
"modelRT/network"
|
|
||||||
"modelRT/orm"
|
|
||||||
_ "modelRT/task"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/gofrs/uuid"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AsyncTaskCreateHandler handles creation of asynchronous tasks
|
|
||||||
// @Summary 创建异步任务
|
|
||||||
// @Description 创建新的异步任务并返回任务ID,任务将被提交到队列等待处理
|
|
||||||
// @Tags AsyncTask
|
|
||||||
// @Accept json
|
|
||||||
// @Produce json
|
|
||||||
// @Param request body network.AsyncTaskCreateRequest true "任务创建请求"
|
|
||||||
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskCreateResponse} "任务创建成功"
|
|
||||||
// @Failure 400 {object} network.FailureResponse "请求参数错误"
|
|
||||||
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
|
|
||||||
// @Router /task/async [post]
|
|
||||||
func AsyncTaskCreateHandler(c *gin.Context) {
|
|
||||||
ctx := c.Request.Context()
|
|
||||||
var request network.AsyncTaskCreateRequest
|
|
||||||
|
|
||||||
if err := c.ShouldBindJSON(&request); err != nil {
|
|
||||||
logger.Error(ctx, "failed to unmarshal async task create request", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid request parameters",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate task type
|
|
||||||
if !orm.IsValidAsyncTaskType(request.TaskType) {
|
|
||||||
logger.Error(ctx, "invalid task type", "task_type", request.TaskType)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid task type",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate task parameters based on task type
|
|
||||||
if !validateTaskParams(request.TaskType, request.Params) {
|
|
||||||
logger.Error(ctx, "invalid task parameters", "task_type", request.TaskType, "params", request.Params)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid task parameters",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get database connection from context or use default
|
|
||||||
db := getDBFromContext(c)
|
|
||||||
if db == nil {
|
|
||||||
logger.Error(ctx, "database connection not found in context")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "database connection error",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create task in database
|
|
||||||
taskType := orm.AsyncTaskType(request.TaskType)
|
|
||||||
params := orm.JSONMap(request.Params)
|
|
||||||
|
|
||||||
asyncTask, err := database.CreateAsyncTask(ctx, db, taskType, params)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to create async task in database", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to create task",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create task queue message
|
|
||||||
// taskQueueMsg := task.NewTaskQueueMessage(asyncTask.TaskID, task.TaskType(request.TaskType))
|
|
||||||
|
|
||||||
// TODO: Send task to message queue (RabbitMQ/Redis)
|
|
||||||
// This should be implemented when message queue integration is ready
|
|
||||||
// For now, we'll just log the task creation
|
|
||||||
logger.Info(ctx, "async task created successfully", "task_id", asyncTask.TaskID, "task_type", request.TaskType)
|
|
||||||
|
|
||||||
// Return success response
|
|
||||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
|
||||||
Code: 2000,
|
|
||||||
Msg: "task created successfully",
|
|
||||||
Payload: network.AsyncTaskCreateResponse{
|
|
||||||
TaskID: asyncTask.TaskID,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskResultQueryHandler handles querying of asynchronous task results
|
|
||||||
// @Summary 查询异步任务结果
|
|
||||||
// @Description 根据任务ID列表查询异步任务的状态和结果
|
|
||||||
// @Tags AsyncTask
|
|
||||||
// @Accept json
|
|
||||||
// @Produce json
|
|
||||||
// @Param task_ids query string true "任务ID列表,用逗号分隔"
|
|
||||||
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskResultQueryResponse} "查询成功"
|
|
||||||
// @Failure 400 {object} network.FailureResponse "请求参数错误"
|
|
||||||
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
|
|
||||||
// @Router /task/async/results [get]
|
|
||||||
func AsyncTaskResultQueryHandler(c *gin.Context) {
|
|
||||||
ctx := c.Request.Context()
|
|
||||||
|
|
||||||
// Parse task IDs from query parameter
|
|
||||||
taskIDsParam := c.Query("task_ids")
|
|
||||||
if taskIDsParam == "" {
|
|
||||||
logger.Error(ctx, "task_ids parameter is required")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "task_ids parameter is required",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse comma-separated task IDs
|
|
||||||
var taskIDs []uuid.UUID
|
|
||||||
taskIDStrs := splitCommaSeparated(taskIDsParam)
|
|
||||||
for _, taskIDStr := range taskIDStrs {
|
|
||||||
taskID, err := uuid.FromString(taskIDStr)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid task ID format",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
taskIDs = append(taskIDs, taskID)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(taskIDs) == 0 {
|
|
||||||
logger.Error(ctx, "no valid task IDs provided")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "no valid task IDs provided",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get database connection from context or use default
|
|
||||||
db := getDBFromContext(c)
|
|
||||||
if db == nil {
|
|
||||||
logger.Error(ctx, "database connection not found in context")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "database connection error",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query tasks from database
|
|
||||||
asyncTasks, err := database.GetAsyncTasksByIDs(ctx, db, taskIDs)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to query async tasks from database", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to query tasks",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query task results from database
|
|
||||||
taskResults, err := database.GetAsyncTaskResults(ctx, db, taskIDs)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to query async task results from database", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to query task results",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a map of task results for easy lookup
|
|
||||||
taskResultMap := make(map[uuid.UUID]orm.AsyncTaskResult)
|
|
||||||
for _, result := range taskResults {
|
|
||||||
taskResultMap[result.TaskID] = result
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert to response format
|
|
||||||
var responseTasks []network.AsyncTaskResult
|
|
||||||
for _, asyncTask := range asyncTasks {
|
|
||||||
taskResult := network.AsyncTaskResult{
|
|
||||||
TaskID: asyncTask.TaskID,
|
|
||||||
TaskType: string(asyncTask.TaskType),
|
|
||||||
Status: string(asyncTask.Status),
|
|
||||||
CreatedAt: asyncTask.CreatedAt,
|
|
||||||
FinishedAt: asyncTask.FinishedAt,
|
|
||||||
Progress: asyncTask.Progress,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add result or error information if available
|
|
||||||
if result, exists := taskResultMap[asyncTask.TaskID]; exists {
|
|
||||||
if result.Result != nil {
|
|
||||||
taskResult.Result = map[string]any(result.Result)
|
|
||||||
}
|
|
||||||
if result.ErrorCode != nil {
|
|
||||||
taskResult.ErrorCode = result.ErrorCode
|
|
||||||
}
|
|
||||||
if result.ErrorMessage != nil {
|
|
||||||
taskResult.ErrorMessage = result.ErrorMessage
|
|
||||||
}
|
|
||||||
if result.ErrorDetail != nil {
|
|
||||||
taskResult.ErrorDetail = map[string]any(result.ErrorDetail)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
responseTasks = append(responseTasks, taskResult)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return success response
|
|
||||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
|
||||||
Code: 2000,
|
|
||||||
Msg: "query completed",
|
|
||||||
Payload: network.AsyncTaskResultQueryResponse{
|
|
||||||
Total: len(responseTasks),
|
|
||||||
Tasks: responseTasks,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskProgressUpdateHandler handles updating task progress (internal use, not exposed via API)
|
|
||||||
func AsyncTaskProgressUpdateHandler(c *gin.Context) {
|
|
||||||
ctx := c.Request.Context()
|
|
||||||
var request network.AsyncTaskProgressUpdate
|
|
||||||
|
|
||||||
if err := c.ShouldBindJSON(&request); err != nil {
|
|
||||||
logger.Error(ctx, "failed to unmarshal async task progress update request", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid request parameters",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get database connection from context or use default
|
|
||||||
db := getDBFromContext(c)
|
|
||||||
if db == nil {
|
|
||||||
logger.Error(ctx, "database connection not found in context")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "database connection error",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update task progress
|
|
||||||
err := database.UpdateAsyncTaskProgress(ctx, db, request.TaskID, request.Progress)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to update async task progress", "task_id", request.TaskID, "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to update task progress",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
|
||||||
Code: 2000,
|
|
||||||
Msg: "task progress updated successfully",
|
|
||||||
Payload: nil,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskStatusUpdateHandler handles updating task status (internal use, not exposed via API)
|
|
||||||
func AsyncTaskStatusUpdateHandler(c *gin.Context) {
|
|
||||||
ctx := c.Request.Context()
|
|
||||||
var request network.AsyncTaskStatusUpdate
|
|
||||||
|
|
||||||
if err := c.ShouldBindJSON(&request); err != nil {
|
|
||||||
logger.Error(ctx, "failed to unmarshal async task status update request", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid request parameters",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate status
|
|
||||||
validStatus := map[string]bool{
|
|
||||||
string(orm.AsyncTaskStatusSubmitted): true,
|
|
||||||
string(orm.AsyncTaskStatusRunning): true,
|
|
||||||
string(orm.AsyncTaskStatusCompleted): true,
|
|
||||||
string(orm.AsyncTaskStatusFailed): true,
|
|
||||||
}
|
|
||||||
|
|
||||||
if !validStatus[request.Status] {
|
|
||||||
logger.Error(ctx, "invalid task status", "status", request.Status)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid task status",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get database connection from context or use default
|
|
||||||
db := getDBFromContext(c)
|
|
||||||
if db == nil {
|
|
||||||
logger.Error(ctx, "database connection not found in context")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "database connection error",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update task status
|
|
||||||
status := orm.AsyncTaskStatus(request.Status)
|
|
||||||
err := database.UpdateAsyncTaskStatus(ctx, db, request.TaskID, status)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to update async task status", "task_id", request.TaskID, "status", request.Status, "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to update task status",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// If task is completed or failed, update finished_at timestamp
|
|
||||||
if request.Status == string(orm.AsyncTaskStatusCompleted) {
|
|
||||||
err = database.CompleteAsyncTask(ctx, db, request.TaskID, request.Timestamp)
|
|
||||||
} else if request.Status == string(orm.AsyncTaskStatusFailed) {
|
|
||||||
err = database.FailAsyncTask(ctx, db, request.TaskID, request.Timestamp)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to update async task completion timestamp", "task_id", request.TaskID, "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to update task completion timestamp",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
|
||||||
Code: 2000,
|
|
||||||
Msg: "task status updated successfully",
|
|
||||||
Payload: nil,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper functions
|
|
||||||
|
|
||||||
func validateTaskParams(taskType string, params map[string]any) bool {
|
|
||||||
switch taskType {
|
|
||||||
case string(orm.AsyncTaskTypeTopologyAnalysis):
|
|
||||||
return validateTopologyAnalysisParams(params)
|
|
||||||
case string(orm.AsyncTaskTypePerformanceAnalysis):
|
|
||||||
return validatePerformanceAnalysisParams(params)
|
|
||||||
case string(orm.AsyncTaskTypeEventAnalysis):
|
|
||||||
return validateEventAnalysisParams(params)
|
|
||||||
case string(orm.AsyncTaskTypeBatchImport):
|
|
||||||
return validateBatchImportParams(params)
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func validateTopologyAnalysisParams(params map[string]any) bool {
|
|
||||||
// Check required parameters for topology analysis
|
|
||||||
if startUUID, ok := params["start_uuid"]; !ok || startUUID == "" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if endUUID, ok := params["end_uuid"]; !ok || endUUID == "" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func validatePerformanceAnalysisParams(params map[string]any) bool {
|
|
||||||
// Check required parameters for performance analysis
|
|
||||||
if componentIDs, ok := params["component_ids"]; !ok {
|
|
||||||
return false
|
|
||||||
} else if ids, isSlice := componentIDs.([]interface{}); !isSlice || len(ids) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func validateEventAnalysisParams(params map[string]any) bool {
|
|
||||||
// Check required parameters for event analysis
|
|
||||||
if eventType, ok := params["event_type"]; !ok || eventType == "" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func validateBatchImportParams(params map[string]any) bool {
|
|
||||||
// Check required parameters for batch import
|
|
||||||
if filePath, ok := params["file_path"]; !ok || filePath == "" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func splitCommaSeparated(s string) []string {
|
|
||||||
var result []string
|
|
||||||
var current strings.Builder
|
|
||||||
inQuotes := false
|
|
||||||
escape := false
|
|
||||||
|
|
||||||
for _, ch := range s {
|
|
||||||
if escape {
|
|
||||||
current.WriteRune(ch)
|
|
||||||
escape = false
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
switch ch {
|
|
||||||
case '\\':
|
|
||||||
escape = true
|
|
||||||
case '"':
|
|
||||||
inQuotes = !inQuotes
|
|
||||||
case ',':
|
|
||||||
if !inQuotes {
|
|
||||||
result = append(result, strings.TrimSpace(current.String()))
|
|
||||||
current.Reset()
|
|
||||||
} else {
|
|
||||||
current.WriteRune(ch)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
current.WriteRune(ch)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if current.Len() > 0 {
|
|
||||||
result = append(result, strings.TrimSpace(current.String()))
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func getDBFromContext(c *gin.Context) *gorm.DB {
|
|
||||||
// Try to get database connection from context
|
|
||||||
// This should be set by middleware
|
|
||||||
if db, exists := c.Get("db"); exists {
|
|
||||||
if gormDB, ok := db.(*gorm.DB); ok {
|
|
||||||
return gormDB
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fallback to global database connection
|
|
||||||
// This should be implemented based on your application's database setup
|
|
||||||
// For now, return nil - actual implementation should retrieve from application context
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskResultDetailHandler handles detailed query of a single async task result
|
|
||||||
// @Summary 查询异步任务详情
|
|
||||||
// @Description 根据任务ID查询异步任务的详细状态和结果
|
|
||||||
// @Tags AsyncTask
|
|
||||||
// @Accept json
|
|
||||||
// @Produce json
|
|
||||||
// @Param task_id path string true "任务ID"
|
|
||||||
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskResult} "查询成功"
|
|
||||||
// @Failure 400 {object} network.FailureResponse "请求参数错误"
|
|
||||||
// @Failure 404 {object} network.FailureResponse "任务不存在"
|
|
||||||
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
|
|
||||||
// @Router /task/async/{task_id} [get]
|
|
||||||
func AsyncTaskResultDetailHandler(c *gin.Context) {
|
|
||||||
ctx := c.Request.Context()
|
|
||||||
|
|
||||||
// Parse task ID from path parameter
|
|
||||||
taskIDStr := c.Param("task_id")
|
|
||||||
if taskIDStr == "" {
|
|
||||||
logger.Error(ctx, "task_id parameter is required")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "task_id parameter is required",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
taskID, err := uuid.FromString(taskIDStr)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid task ID format",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get database connection from context or use default
|
|
||||||
db := getDBFromContext(c)
|
|
||||||
if db == nil {
|
|
||||||
logger.Error(ctx, "database connection not found in context")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "database connection error",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query task from database
|
|
||||||
asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID)
|
|
||||||
if err != nil {
|
|
||||||
if err == gorm.ErrRecordNotFound {
|
|
||||||
logger.Error(ctx, "async task not found", "task_id", taskID)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusNotFound,
|
|
||||||
Msg: "task not found",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Error(ctx, "failed to query async task from database", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to query task",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query task result from database
|
|
||||||
taskResult, err := database.GetAsyncTaskResult(ctx, db, taskID)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to query async task result from database", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to query task result",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert to response format
|
|
||||||
responseTask := network.AsyncTaskResult{
|
|
||||||
TaskID: asyncTask.TaskID,
|
|
||||||
TaskType: string(asyncTask.TaskType),
|
|
||||||
Status: string(asyncTask.Status),
|
|
||||||
CreatedAt: asyncTask.CreatedAt,
|
|
||||||
FinishedAt: asyncTask.FinishedAt,
|
|
||||||
Progress: asyncTask.Progress,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add result or error information if available
|
|
||||||
if taskResult != nil {
|
|
||||||
if taskResult.Result != nil {
|
|
||||||
responseTask.Result = map[string]any(taskResult.Result)
|
|
||||||
}
|
|
||||||
if taskResult.ErrorCode != nil {
|
|
||||||
responseTask.ErrorCode = taskResult.ErrorCode
|
|
||||||
}
|
|
||||||
if taskResult.ErrorMessage != nil {
|
|
||||||
responseTask.ErrorMessage = taskResult.ErrorMessage
|
|
||||||
}
|
|
||||||
if taskResult.ErrorDetail != nil {
|
|
||||||
responseTask.ErrorDetail = map[string]any(taskResult.ErrorDetail)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return success response
|
|
||||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
|
||||||
Code: 2000,
|
|
||||||
Msg: "query completed",
|
|
||||||
Payload: responseTask,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskCancelHandler handles cancellation of an async task
|
|
||||||
// @Summary 取消异步任务
|
|
||||||
// @Description 取消指定ID的异步任务(如果任务尚未开始执行)
|
|
||||||
// @Tags AsyncTask
|
|
||||||
// @Accept json
|
|
||||||
// @Produce json
|
|
||||||
// @Param task_id path string true "任务ID"
|
|
||||||
// @Success 200 {object} network.SuccessResponse "任务取消成功"
|
|
||||||
// @Failure 400 {object} network.FailureResponse "请求参数错误或任务无法取消"
|
|
||||||
// @Failure 404 {object} network.FailureResponse "任务不存在"
|
|
||||||
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
|
|
||||||
// @Router /task/async/{task_id}/cancel [post]
|
|
||||||
func AsyncTaskCancelHandler(c *gin.Context) {
|
|
||||||
ctx := c.Request.Context()
|
|
||||||
|
|
||||||
// Parse task ID from path parameter
|
|
||||||
taskIDStr := c.Param("task_id")
|
|
||||||
if taskIDStr == "" {
|
|
||||||
logger.Error(ctx, "task_id parameter is required")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "task_id parameter is required",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
taskID, err := uuid.FromString(taskIDStr)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "invalid task ID format",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get database connection from context or use default
|
|
||||||
db := getDBFromContext(c)
|
|
||||||
if db == nil {
|
|
||||||
logger.Error(ctx, "database connection not found in context")
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "database connection error",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query task from database
|
|
||||||
asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID)
|
|
||||||
if err != nil {
|
|
||||||
if err == gorm.ErrRecordNotFound {
|
|
||||||
logger.Error(ctx, "async task not found", "task_id", taskID)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusNotFound,
|
|
||||||
Msg: "task not found",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Error(ctx, "failed to query async task from database", "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to query task",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if task can be cancelled (only SUBMITTED tasks can be cancelled)
|
|
||||||
if asyncTask.Status != orm.AsyncTaskStatusSubmitted {
|
|
||||||
logger.Error(ctx, "task cannot be cancelled", "task_id", taskID, "status", asyncTask.Status)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusBadRequest,
|
|
||||||
Msg: "task cannot be cancelled (already running or completed)",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update task status to failed with cancellation reason
|
|
||||||
timestamp := time.Now().Unix()
|
|
||||||
err = database.FailAsyncTask(ctx, db, taskID, timestamp)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to cancel async task", "task_id", taskID, "error", err)
|
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
|
||||||
Code: http.StatusInternalServerError,
|
|
||||||
Msg: "failed to cancel task",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update task result with cancellation error
|
|
||||||
err = database.UpdateAsyncTaskResultWithError(ctx, db, taskID, 40003, "task cancelled by user", orm.JSONMap{
|
|
||||||
"cancelled_at": timestamp,
|
|
||||||
"cancelled_by": "user",
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to update task result with cancellation error", "task_id", taskID, "error", err)
|
|
||||||
// Continue anyway since task is already marked as failed
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
|
||||||
Code: 2000,
|
|
||||||
Msg: "task cancelled successfully",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
@ -3,7 +3,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/common"
|
"modelRT/constants"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
|
|
@ -16,7 +16,7 @@ func AttrDeleteHandler(c *gin.Context) {
|
||||||
var request network.AttrDeleteRequest
|
var request network.AttrDeleteRequest
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := common.ErrGetClientToken
|
err := constants.ErrGetClientToken
|
||||||
|
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/common"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
|
|
@ -17,7 +17,7 @@ func AttrGetHandler(c *gin.Context) {
|
||||||
|
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := common.ErrGetClientToken
|
err := constants.ErrGetClientToken
|
||||||
|
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/common"
|
"modelRT/constants"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
|
|
@ -17,7 +17,7 @@ func AttrSetHandler(c *gin.Context) {
|
||||||
|
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := common.ErrGetClientToken
|
err := constants.ErrGetClientToken
|
||||||
|
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/common"
|
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
|
|
@ -44,7 +43,7 @@ func DiagramNodeLinkHandler(c *gin.Context) {
|
||||||
var request network.DiagramNodeLinkRequest
|
var request network.DiagramNodeLinkRequest
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := common.ErrGetClientToken
|
err := constants.ErrGetClientToken
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
|
|
@ -168,7 +167,7 @@ func processLinkSetData(ctx context.Context, action string, level int, prevLinkS
|
||||||
err2 = prevLinkSet.SREM(prevMember)
|
err2 = prevLinkSet.SREM(prevMember)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
err := common.ErrUnsupportedLinkAction
|
err := constants.ErrUnsupportedLinkAction
|
||||||
logger.Error(ctx, "unsupport diagram node link process action", "action", action, "error", err)
|
logger.Error(ctx, "unsupport diagram node link process action", "action", action, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,14 +30,3 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) {
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, resp)
|
c.JSON(http.StatusOK, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func renderWSRespFailure(c *gin.Context, code int, msg string, payload any) {
|
|
||||||
resp := network.WSResponse{
|
|
||||||
Code: code,
|
|
||||||
Msg: msg,
|
|
||||||
}
|
|
||||||
if payload != nil {
|
|
||||||
resp.Payload = payload
|
|
||||||
}
|
|
||||||
c.JSON(http.StatusOK, resp)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,10 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"modelRT/alert"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
"modelRT/real-time-data/alert"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/common"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
|
@ -19,7 +19,7 @@ func MeasurementGetHandler(c *gin.Context) {
|
||||||
|
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := common.ErrGetClientToken
|
err := constants.ErrGetClientToken
|
||||||
|
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/common"
|
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
|
|
@ -21,7 +20,7 @@ func MeasurementLinkHandler(c *gin.Context) {
|
||||||
var request network.MeasurementLinkRequest
|
var request network.MeasurementLinkRequest
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := common.ErrGetClientToken
|
err := constants.ErrGetClientToken
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
|
|
@ -94,7 +93,7 @@ func MeasurementLinkHandler(c *gin.Context) {
|
||||||
logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
|
logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
err = common.ErrUnsupportedLinkAction
|
err = constants.ErrUnsupportedLinkAction
|
||||||
logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err)
|
logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -39,14 +39,20 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
||||||
if clientID == "" {
|
if clientID == "" {
|
||||||
err := fmt.Errorf("clientID is missing from the path")
|
err := fmt.Errorf("clientID is missing from the path")
|
||||||
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
|
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
|
||||||
renderWSRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
|
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(c, "upgrade http protocol to websocket protocol failed", "error", err)
|
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
|
||||||
renderWSRespFailure(c, constants.RespCodeServerError, err.Error(), nil)
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
@ -54,18 +60,9 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
||||||
ctx, cancel := context.WithCancel(c.Request.Context())
|
ctx, cancel := context.WithCancel(c.Request.Context())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
conn.SetCloseHandler(func(code int, text string) error {
|
|
||||||
logger.Info(c.Request.Context(), "websocket processor shutdown trigger",
|
|
||||||
"clientID", clientID, "code", code, "reason", text)
|
|
||||||
|
|
||||||
// call cancel to notify other goroutines to stop working
|
|
||||||
cancel()
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
|
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
|
||||||
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
|
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
|
||||||
sendChan := make(chan network.WSResponse, constants.SendChanBufferSize)
|
sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize)
|
||||||
|
|
||||||
go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan)
|
go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan)
|
||||||
go readClientMessages(ctx, conn, clientID, cancel)
|
go readClientMessages(ctx, conn, clientID, cancel)
|
||||||
|
|
@ -82,33 +79,52 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
||||||
select {
|
select {
|
||||||
case targetData, ok := <-fanInChan:
|
case targetData, ok := <-fanInChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
sendChan <- network.WSResponse{
|
logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID)
|
||||||
Code: constants.RespCodeServerError,
|
|
||||||
Msg: "abnormal shutdown of data fan-in channel",
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
buffer = append(buffer, targetData)
|
buffer = append(buffer, targetData)
|
||||||
|
|
||||||
if len(buffer) >= bufferMaxSize {
|
if len(buffer) >= bufferMaxSize {
|
||||||
flushBuffer(ctx, &buffer, sendChan, clientID, "buffer_full")
|
// buffer is full, send immediately
|
||||||
|
select {
|
||||||
|
case sendChan <- buffer:
|
||||||
|
default:
|
||||||
|
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset buffer
|
||||||
|
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
|
||||||
|
// reset the ticker to prevent it from triggering immediately after the ticker is sent
|
||||||
ticker.Reset(sendMaxInterval)
|
ticker.Reset(sendMaxInterval)
|
||||||
}
|
}
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if len(buffer) > 0 {
|
if len(buffer) > 0 {
|
||||||
flushBuffer(ctx, &buffer, sendChan, clientID, "ticker_timeout")
|
// when the ticker is triggered, all data in the send buffer is sent
|
||||||
|
select {
|
||||||
|
case sendChan <- buffer:
|
||||||
|
default:
|
||||||
|
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset buffer
|
||||||
|
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// last refresh before exiting
|
// send the last remaining data
|
||||||
if len(buffer) > 0 {
|
if len(buffer) > 0 {
|
||||||
flushBuffer(ctx, &buffer, sendChan, clientID, "shutdown")
|
select {
|
||||||
|
case sendChan <- buffer:
|
||||||
|
default:
|
||||||
|
logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// readClientMessages define func to responsible for continuously listening for messages sent by clients (such as Ping/Pong, Close Frame, or control commands)
|
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
|
||||||
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
|
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
|
||||||
// conn.SetReadLimit(512)
|
// conn.SetReadLimit(512)
|
||||||
for {
|
for {
|
||||||
|
|
@ -133,47 +149,54 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func flushBuffer(ctx context.Context, buffer *[]network.RealTimePullTarget, sendChan chan<- network.WSResponse, clientID string, reason string) {
|
// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client
|
||||||
if len(*buffer) == 0 {
|
func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error {
|
||||||
return
|
if len(targetsData) == 0 {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
response := network.SuccessResponse{
|
||||||
resp := network.WSResponse{
|
Code: 200,
|
||||||
Code: constants.RespCodeSuccess,
|
Msg: "success",
|
||||||
Msg: "process completed",
|
|
||||||
Payload: network.RealTimePullPayload{
|
Payload: network.RealTimePullPayload{
|
||||||
Targets: *buffer,
|
Targets: targetsData,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
return conn.WriteJSON(response)
|
||||||
select {
|
|
||||||
case sendChan <- resp:
|
|
||||||
default:
|
|
||||||
logger.Warn(ctx, "sendChan blocked, dropping data batch", "client_id", clientID, "reason", reason)
|
|
||||||
}
|
|
||||||
*buffer = make([]network.RealTimePullTarget, 0, constants.SendMaxBatchSize)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client
|
// sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client
|
||||||
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan network.WSResponse, cancel context.CancelFunc) {
|
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
logger.Error(ctx, "sendDataStream recovered from panic", "err", r)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
|
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
|
||||||
for resp := range sendChan {
|
for targetsData := range sendChan {
|
||||||
if err := conn.WriteJSON(resp); err != nil {
|
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
|
||||||
logger.Error(ctx, "websocket write failed", "client_id", clientID, "error", err)
|
if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved {
|
||||||
|
err := conn.WriteJSON(map[string]any{
|
||||||
|
"code": 2101,
|
||||||
|
"msg": "all targets removed in given client_id",
|
||||||
|
"payload": map[string]int{
|
||||||
|
"active_targets_count": 0,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil {
|
||||||
|
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err)
|
||||||
cancel()
|
cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target
|
// processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target
|
||||||
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- network.WSResponse) {
|
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) {
|
||||||
|
// ensure the fanInChan will not leak
|
||||||
|
defer close(fanInChan)
|
||||||
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
|
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
|
||||||
stopChanMap := make(map[string]chan struct{})
|
stopChanMap := make(map[string]chan struct{})
|
||||||
s.globalMutex.RLock()
|
s.globalMutex.RLock()
|
||||||
|
|
@ -360,7 +383,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeTargets define func to stops running polling goroutines for targets that were removed
|
// removeTargets define func to stops running polling goroutines for targets that were removed
|
||||||
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- network.WSResponse) {
|
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) {
|
||||||
for _, target := range removeTargets {
|
for _, target := range removeTargets {
|
||||||
stopChan, exists := stopChanMap[target]
|
stopChan, exists := stopChanMap[target]
|
||||||
if !exists {
|
if !exists {
|
||||||
|
|
@ -379,18 +402,17 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- network.WSResponse) {
|
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) {
|
||||||
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
|
specialTarget := network.RealTimePullTarget{
|
||||||
resp := network.WSResponse{
|
ID: constants.SysCtrlAllRemoved,
|
||||||
Code: constants.RespCodeSuccessWithNoSub,
|
Datas: []network.RealTimePullData{},
|
||||||
Msg: "all targets removed",
|
|
||||||
Payload: map[string]int{"active_targets_count": 0},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case sendChan <- resp:
|
case sendChan <- []network.RealTimePullTarget{specialTarget}:
|
||||||
|
logger.Info(ctx, "sent 2101 status request to sendChan")
|
||||||
default:
|
default:
|
||||||
logger.Warn(ctx, "sendChan is full, skipping 2101 status")
|
logger.Warn(ctx, "sendChan is full, skipping 2101 status message")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -401,6 +423,7 @@ func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) {
|
||||||
close(stopChan)
|
close(stopChan)
|
||||||
}
|
}
|
||||||
clear(stopChanMap)
|
clear(stopChanMap)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// redisPollingConfig define struct for param which query real time data from redis
|
// redisPollingConfig define struct for param which query real time data from redis
|
||||||
|
|
@ -440,7 +463,7 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) {
|
func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) {
|
||||||
members, err := client.QueryByZRange(ctx, config.queryKey, config.dataSize)
|
members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
|
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -168,6 +168,7 @@ func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, tran
|
||||||
}
|
}
|
||||||
transportChannel <- subPoss
|
transportChannel <- subPoss
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// messageTypeToString define func of auxiliary to convert message type to string
|
// messageTypeToString define func of auxiliary to convert message type to string
|
||||||
|
|
|
||||||
|
|
@ -5,9 +5,9 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"maps"
|
"maps"
|
||||||
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"modelRT/common"
|
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
|
@ -33,42 +33,42 @@ func init() {
|
||||||
// @Accept json
|
// @Accept json
|
||||||
// @Produce json
|
// @Produce json
|
||||||
// @Param request body network.RealTimeSubRequest true "量测节点实时数据订阅"
|
// @Param request body network.RealTimeSubRequest true "量测节点实时数据订阅"
|
||||||
// @Success 2000 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||||
//
|
//
|
||||||
// @Example 2000 {
|
// @Example 200 {
|
||||||
// "code": 2000,
|
// "code": 200,
|
||||||
// "msg": "process completed",
|
// "msg": "success",
|
||||||
// "payload": {
|
// "payload": {
|
||||||
// "targets": [
|
// "targets": [
|
||||||
// {
|
// {
|
||||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_C_rms",
|
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_C_rms",
|
||||||
// "code": "20000",
|
// "code": "1001",
|
||||||
// "msg": "subscription success"
|
// "msg": "subscription success"
|
||||||
// },
|
// },
|
||||||
// {
|
// {
|
||||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
|
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
|
||||||
// "code": "20000",
|
// "code": "1002",
|
||||||
// "msg": "subscription failed"
|
// "msg": "subscription failed"
|
||||||
// }
|
// }
|
||||||
// ]
|
// ]
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// @Failure 3000 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||||
//
|
//
|
||||||
// @Example 3000 {
|
// @Example 400 {
|
||||||
// "code": 3000,
|
// "code": 400,
|
||||||
// "msg": "process completed with partial failures",
|
// "msg": "failed to get recommend data from redis",
|
||||||
// "payload": {
|
// "payload": {
|
||||||
// "targets": [
|
// "targets": [
|
||||||
// {
|
// {
|
||||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_A_rms",
|
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_A_rms",
|
||||||
// "code": "40005",
|
// "code": "1002",
|
||||||
// "msg": "subscription failed"
|
// "msg": "subscription failed"
|
||||||
// },
|
// },
|
||||||
// {
|
// {
|
||||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
|
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
|
||||||
// "code": "50001",
|
// "code": "1002",
|
||||||
// "msg": "subscription failed"
|
// "msg": "subscription failed"
|
||||||
// }
|
// }
|
||||||
// ]
|
// ]
|
||||||
|
|
@ -83,7 +83,10 @@ func RealTimeSubHandler(c *gin.Context) {
|
||||||
|
|
||||||
if err := c.ShouldBindJSON(&request); err != nil {
|
if err := c.ShouldBindJSON(&request); err != nil {
|
||||||
logger.Error(c, "failed to unmarshal real time query request", "error", err)
|
logger.Error(c, "failed to unmarshal real time query request", "error", err)
|
||||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -92,7 +95,10 @@ func RealTimeSubHandler(c *gin.Context) {
|
||||||
id, err := uuid.NewV4()
|
id, err := uuid.NewV4()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(c, "failed to generate client id", "error", err)
|
logger.Error(c, "failed to generate client id", "error", err)
|
||||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
clientID = id.String()
|
clientID = id.String()
|
||||||
|
|
@ -117,74 +123,110 @@ func RealTimeSubHandler(c *gin.Context) {
|
||||||
results, err := globalSubState.CreateConfig(c, tx, clientID, request.Measurements)
|
results, err := globalSubState.CreateConfig(c, tx, clientID, request.Measurements)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(c, "create real time data subscription config failed", "error", err)
|
logger.Error(c, "create real time data subscription config failed", "error", err)
|
||||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
Msg: "success",
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
case constants.SubStopAction:
|
case constants.SubStopAction:
|
||||||
results, err := globalSubState.RemoveTargets(c, clientID, request.Measurements)
|
results, err := globalSubState.RemoveTargets(c, clientID, request.Measurements)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(c, "remove target to real time data subscription config failed", "error", err)
|
logger.Error(c, "remove target to real time data subscription config failed", "error", err)
|
||||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
Msg: "success",
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
case constants.SubAppendAction:
|
case constants.SubAppendAction:
|
||||||
results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements)
|
results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(c, "append target to real time data subscription config failed", "error", err)
|
logger.Error(c, "append target to real time data subscription config failed", "error", err)
|
||||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
Msg: "success",
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
case constants.SubUpdateAction:
|
case constants.SubUpdateAction:
|
||||||
results, err := globalSubState.UpdateTargets(c, tx, clientID, request.Measurements)
|
results, err := globalSubState.UpdateTargets(c, tx, clientID, request.Measurements)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(c, "update target to real time data subscription config failed", "error", err)
|
logger.Error(c, "update target to real time data subscription config failed", "error", err)
|
||||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
Msg: "success",
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
err := fmt.Errorf("%w: request action is %s", common.ErrUnsupportedSubAction, request.Action)
|
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action)
|
||||||
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
||||||
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
||||||
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, constants.CodeUnsupportSubOperation, err)
|
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err)
|
||||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), network.RealTimeSubPayload{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
Payload: network.RealTimeSubPayload{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
TargetResults: results,
|
TargetResults: results,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -241,12 +283,12 @@ func processAndValidateTargetsForStart(ctx context.Context, tx *gorm.DB, measure
|
||||||
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
|
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
|
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
|
||||||
targetResult.Code = constants.CodeFoundTargetFailed
|
targetResult.Code = constants.SubFailedCode
|
||||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
|
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
targetResult.Code = constants.CodeSuccess
|
targetResult.Code = constants.SubSuccessCode
|
||||||
targetResult.Msg = constants.SubSuccessMsg
|
targetResult.Msg = constants.SubSuccessMsg
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
successfulTargets = append(successfulTargets, target)
|
successfulTargets = append(successfulTargets, target)
|
||||||
|
|
@ -285,7 +327,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
|
||||||
if _, exist := config.targetContext[target]; !exist {
|
if _, exist := config.targetContext[target]; !exist {
|
||||||
err := fmt.Errorf("target %s does not exists in subscription list", target)
|
err := fmt.Errorf("target %s does not exists in subscription list", target)
|
||||||
logger.Error(ctx, "update target does not exist in subscription list", "error", err, "target", target)
|
logger.Error(ctx, "update target does not exist in subscription list", "error", err, "target", target)
|
||||||
targetResult.Code = constants.CodeUpdateSubTargetMissing
|
targetResult.Code = constants.UpdateSubFailedCode
|
||||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
|
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
continue
|
continue
|
||||||
|
|
@ -294,13 +336,13 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
|
||||||
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
|
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
|
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
|
||||||
targetResult.Code = constants.CodeDBQueryFailed
|
targetResult.Code = constants.UpdateSubFailedCode
|
||||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
|
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
targetResult.Code = constants.CodeSuccess
|
targetResult.Code = constants.UpdateSubSuccessCode
|
||||||
targetResult.Msg = constants.UpdateSubSuccessMsg
|
targetResult.Msg = constants.UpdateSubSuccessMsg
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
successfulTargets = append(successfulTargets, target)
|
successfulTargets = append(successfulTargets, target)
|
||||||
|
|
@ -431,7 +473,7 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI
|
||||||
if !exist {
|
if !exist {
|
||||||
err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID)
|
err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID)
|
||||||
logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err)
|
logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err)
|
||||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeAppendSubTargetMissing, err), err
|
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||||
}
|
}
|
||||||
|
|
||||||
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount)
|
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount)
|
||||||
|
|
@ -465,7 +507,7 @@ func filterAndDeduplicateRepeatTargets(resultsSlice []network.TargetResult, idsS
|
||||||
|
|
||||||
for index := range resultsSlice {
|
for index := range resultsSlice {
|
||||||
if _, isTarget := set[resultsSlice[index].ID]; isTarget {
|
if _, isTarget := set[resultsSlice[index].ID]; isTarget {
|
||||||
resultsSlice[index].Code = constants.CodeSubTargetRepeat
|
resultsSlice[index].Code = constants.SubRepeatCode
|
||||||
resultsSlice[index].Msg = constants.SubRepeatMsg
|
resultsSlice[index].Msg = constants.SubRepeatMsg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -533,7 +575,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
||||||
s.globalMutex.RUnlock()
|
s.globalMutex.RUnlock()
|
||||||
err := fmt.Errorf("clientID %s not found", clientID)
|
err := fmt.Errorf("clientID %s not found", clientID)
|
||||||
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
|
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
|
||||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeCancelSubTargetMissing, err), err
|
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||||
}
|
}
|
||||||
s.globalMutex.RUnlock()
|
s.globalMutex.RUnlock()
|
||||||
|
|
||||||
|
|
@ -553,7 +595,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
||||||
for _, target := range measTargets {
|
for _, target := range measTargets {
|
||||||
targetResult := network.TargetResult{
|
targetResult := network.TargetResult{
|
||||||
ID: target,
|
ID: target,
|
||||||
Code: constants.CodeCancelSubTargetMissing,
|
Code: constants.CancelSubFailedCode,
|
||||||
Msg: constants.CancelSubFailedMsg,
|
Msg: constants.CancelSubFailedMsg,
|
||||||
}
|
}
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
|
|
@ -574,7 +616,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
||||||
transportTargets.Targets = append(transportTargets.Targets, existingTarget)
|
transportTargets.Targets = append(transportTargets.Targets, existingTarget)
|
||||||
targetResult := network.TargetResult{
|
targetResult := network.TargetResult{
|
||||||
ID: existingTarget,
|
ID: existingTarget,
|
||||||
Code: constants.CodeSuccess,
|
Code: constants.CancelSubSuccessCode,
|
||||||
Msg: constants.CancelSubSuccessMsg,
|
Msg: constants.CancelSubSuccessMsg,
|
||||||
}
|
}
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
|
|
@ -597,7 +639,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
||||||
for target := range targetsToRemoveMap {
|
for target := range targetsToRemoveMap {
|
||||||
targetResult := network.TargetResult{
|
targetResult := network.TargetResult{
|
||||||
ID: target,
|
ID: target,
|
||||||
Code: constants.CodeCancelSubTargetMissing,
|
Code: constants.CancelSubFailedCode,
|
||||||
Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()),
|
Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()),
|
||||||
}
|
}
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
|
|
@ -621,15 +663,17 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
||||||
// UpdateTargets define function to update targets in SharedSubState
|
// UpdateTargets define function to update targets in SharedSubState
|
||||||
func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) {
|
func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) {
|
||||||
requestTargetsCount := processRealTimeRequestCount(measurements)
|
requestTargetsCount := processRealTimeRequestCount(measurements)
|
||||||
|
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
|
||||||
|
|
||||||
s.globalMutex.RLock()
|
s.globalMutex.RLock()
|
||||||
config, exist := s.subMap[clientID]
|
config, exist := s.subMap[clientID]
|
||||||
s.globalMutex.RUnlock()
|
s.globalMutex.RUnlock()
|
||||||
|
|
||||||
if !exist {
|
if !exist {
|
||||||
|
s.globalMutex.RUnlock()
|
||||||
err := fmt.Errorf("clientID %s not found", clientID)
|
err := fmt.Errorf("clientID %s not found", clientID)
|
||||||
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
|
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
|
||||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeUpdateSubTargetMissing, err), err
|
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||||
}
|
}
|
||||||
|
|
||||||
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForUpdate(ctx, tx, config, measurements, requestTargetsCount)
|
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForUpdate(ctx, tx, config, measurements, requestTargetsCount)
|
||||||
|
|
@ -678,13 +722,13 @@ func processRealTimeRequestCount(measurements []network.RealTimeMeasurementItem)
|
||||||
return totalTargetsCount
|
return totalTargetsCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, businessCode int, err error) []network.TargetResult {
|
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult {
|
||||||
targetProcessResults := make([]network.TargetResult, 0, targetCount)
|
targetProcessResults := make([]network.TargetResult, 0, targetCount)
|
||||||
for _, measurementItem := range measurements {
|
for _, measurementItem := range measurements {
|
||||||
for _, target := range measurementItem.Targets {
|
for _, target := range measurementItem.Targets {
|
||||||
var targetResult network.TargetResult
|
var targetResult network.TargetResult
|
||||||
targetResult.ID = target
|
targetResult.ID = target
|
||||||
targetResult.Code = businessCode
|
targetResult.Code = constants.SubFailedCode
|
||||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
|
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
|
||||||
targetProcessResults = append(targetProcessResults, targetResult)
|
targetProcessResults = append(targetProcessResults, targetResult)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
34
main.go
34
main.go
|
|
@ -13,15 +13,13 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
|
"modelRT/alert"
|
||||||
"modelRT/config"
|
"modelRT/config"
|
||||||
"modelRT/constants"
|
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/model"
|
"modelRT/model"
|
||||||
"modelRT/mq"
|
|
||||||
"modelRT/pool"
|
"modelRT/pool"
|
||||||
"modelRT/real-time-data/alert"
|
|
||||||
"modelRT/router"
|
"modelRT/router"
|
||||||
"modelRT/util"
|
"modelRT/util"
|
||||||
|
|
||||||
|
|
@ -100,14 +98,14 @@ func main() {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceName, modelRTConfig.SecretKey)
|
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceConfig.ServiceName, modelRTConfig.ServiceConfig.SecretKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "generate client token failed", "error", err)
|
logger.Error(ctx, "generate client token failed", "error", err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// init postgresDBClient
|
// init postgresDBClient
|
||||||
postgresDBClient = database.InitPostgresDBInstance(modelRTConfig.PostgresDBURI)
|
postgresDBClient = database.InitPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
sqlDB, err := postgresDBClient.DB()
|
sqlDB, err := postgresDBClient.DB()
|
||||||
|
|
@ -129,17 +127,13 @@ func main() {
|
||||||
defer parsePool.Release()
|
defer parsePool.Release()
|
||||||
|
|
||||||
searchPool, err := util.NewRedigoPool(modelRTConfig.StorageRedisConfig)
|
searchPool, err := util.NewRedigoPool(modelRTConfig.StorageRedisConfig)
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "init redigo pool failed", "error", err)
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer searchPool.Close()
|
defer searchPool.Close()
|
||||||
model.InitAutocompleterWithPool(searchPool)
|
model.InitAutocompleterWithPool(searchPool)
|
||||||
|
|
||||||
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig, modelRTConfig.DeployEnv)
|
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig)
|
||||||
defer storageClient.Close()
|
defer storageClient.Close()
|
||||||
|
|
||||||
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig, modelRTConfig.DeployEnv)
|
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig)
|
||||||
defer lockerClient.Close()
|
defer lockerClient.Close()
|
||||||
|
|
||||||
// init anchor param ants pool
|
// init anchor param ants pool
|
||||||
|
|
@ -150,11 +144,6 @@ func main() {
|
||||||
}
|
}
|
||||||
defer anchorRealTimePool.Release()
|
defer anchorRealTimePool.Release()
|
||||||
|
|
||||||
// init rabbitmq connection
|
|
||||||
mq.InitRabbitProxy(ctx, modelRTConfig.RabbitMQConfig)
|
|
||||||
// async push event to rabbitMQ
|
|
||||||
go mq.PushUpDownLimitEventToRabbitMQ(ctx, mq.MsgChan)
|
|
||||||
|
|
||||||
postgresDBClient.Transaction(func(tx *gorm.DB) error {
|
postgresDBClient.Transaction(func(tx *gorm.DB) error {
|
||||||
// load circuit diagram from postgres
|
// load circuit diagram from postgres
|
||||||
// componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool)
|
// componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool)
|
||||||
|
|
@ -204,7 +193,7 @@ func main() {
|
||||||
logger.Error(ctx, "load topologic info from postgres failed", "error", err)
|
logger.Error(ctx, "load topologic info from postgres failed", "error", err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
go realtimedata.StartComputingRealTimeDataLimit(ctx, allMeasurement)
|
go realtimedata.StartRealTimeDataComputing(ctx, allMeasurement)
|
||||||
|
|
||||||
tree, err := database.QueryTopologicFromDB(ctx, tx)
|
tree, err := database.QueryTopologicFromDB(ctx, tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -215,10 +204,8 @@ func main() {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
// use release mode in production
|
// use release mode in productio
|
||||||
if modelRTConfig.DeployEnv == constants.ProductionDeployMode {
|
// gin.SetMode(gin.ReleaseMode)
|
||||||
gin.SetMode(gin.ReleaseMode)
|
|
||||||
}
|
|
||||||
engine := gin.New()
|
engine := gin.New()
|
||||||
router.RegisterRoutes(engine, serviceToken)
|
router.RegisterRoutes(engine, serviceToken)
|
||||||
|
|
||||||
|
|
@ -236,7 +223,7 @@ func main() {
|
||||||
// }
|
// }
|
||||||
|
|
||||||
server := http.Server{
|
server := http.Server{
|
||||||
Addr: modelRTConfig.ServiceAddr,
|
Addr: modelRTConfig.ServiceConfig.ServiceAddr,
|
||||||
Handler: engine,
|
Handler: engine,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -245,12 +232,9 @@ func main() {
|
||||||
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||||
go func() {
|
go func() {
|
||||||
<-done
|
<-done
|
||||||
logger.Info(ctx, "shutdown signal received, cleaning up...")
|
|
||||||
if err := server.Shutdown(context.Background()); err != nil {
|
if err := server.Shutdown(context.Background()); err != nil {
|
||||||
logger.Error(ctx, "shutdown serverError", "err", err)
|
logger.Error(ctx, "shutdown serverError", "err", err)
|
||||||
}
|
}
|
||||||
mq.CloseRabbitProxy()
|
|
||||||
logger.Info(ctx, "resources cleaned up, exiting")
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
logger.Info(ctx, "starting ModelRT server")
|
logger.Info(ctx, "starting ModelRT server")
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"modelRT/common"
|
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -62,7 +61,7 @@ func generateChannelName(prefix string, number int, suffix string) (string, erro
|
||||||
switch prefix {
|
switch prefix {
|
||||||
case constants.ChannelPrefixTelemetry:
|
case constants.ChannelPrefixTelemetry:
|
||||||
if number > 10 {
|
if number > 10 {
|
||||||
return "", common.ErrExceedsLimitType
|
return "", constants.ErrExceedsLimitType
|
||||||
}
|
}
|
||||||
var builder strings.Builder
|
var builder strings.Builder
|
||||||
numberStr := strconv.Itoa(number)
|
numberStr := strconv.Itoa(number)
|
||||||
|
|
@ -87,7 +86,7 @@ func generateChannelName(prefix string, number int, suffix string) (string, erro
|
||||||
channelName := builder.String()
|
channelName := builder.String()
|
||||||
return channelName, nil
|
return channelName, nil
|
||||||
default:
|
default:
|
||||||
return "", common.ErrUnsupportedChannelPrefixType
|
return "", constants.ErrUnsupportedChannelPrefixType
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -165,14 +164,14 @@ func (m MeasurementDataSource) GetIOAddress() (IOAddress, error) {
|
||||||
if addr, ok := m.IOAddress.(CL3611Address); ok {
|
if addr, ok := m.IOAddress.(CL3611Address); ok {
|
||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
return nil, common.ErrInvalidAddressType
|
return nil, constants.ErrInvalidAddressType
|
||||||
case constants.DataSourceTypePower104:
|
case constants.DataSourceTypePower104:
|
||||||
if addr, ok := m.IOAddress.(Power104Address); ok {
|
if addr, ok := m.IOAddress.(Power104Address); ok {
|
||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
return nil, common.ErrInvalidAddressType
|
return nil, constants.ErrInvalidAddressType
|
||||||
default:
|
default:
|
||||||
return nil, common.ErrUnknownDataType
|
return nil, constants.ErrUnknownDataType
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,39 +0,0 @@
|
||||||
// Package event define real time data evnet operation functions
|
|
||||||
package event
|
|
||||||
|
|
||||||
// EventRecord define struct for CIM event record
|
|
||||||
type EventRecord struct {
|
|
||||||
// 事件名称
|
|
||||||
EventName string `json:"event"`
|
|
||||||
// 事件唯一标识符
|
|
||||||
EventUUID string `json:"event_uuid"`
|
|
||||||
// 事件类型
|
|
||||||
Type int `json:"type"`
|
|
||||||
// 事件优先级 (0-9)
|
|
||||||
Priority int `json:"priority"`
|
|
||||||
// 事件状态
|
|
||||||
Status int `json:"status"`
|
|
||||||
// 可选模板参数
|
|
||||||
Category string `json:"category,omitempty"`
|
|
||||||
// 毫秒级时间戳 (Unix epoch)
|
|
||||||
Timestamp int64 `json:"timestamp"`
|
|
||||||
// 事件来源 (station, platform, msa)
|
|
||||||
From string `json:"from"`
|
|
||||||
// 事件场景描述对象 (如阈值、当前值)
|
|
||||||
Condition map[string]any `json:"condition"`
|
|
||||||
// 与事件相关的订阅信息
|
|
||||||
AttachedSubscriptions []any `json:"attached_subscriptions"`
|
|
||||||
// 事件分析结果对象
|
|
||||||
Result map[string]any `json:"result,omitempty"`
|
|
||||||
// 操作历史记录 (CIM ActivityRecord)
|
|
||||||
Operations []OperationRecord `json:"operations"`
|
|
||||||
// 子站告警原始数据 (CIM Alarm 数据)
|
|
||||||
Origin map[string]any `json:"origin,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等
|
|
||||||
type OperationRecord struct {
|
|
||||||
Action string `json:"action"` // 执行的动作,如 "acknowledgment"
|
|
||||||
Op string `json:"op"` // 操作人/操作账号标识
|
|
||||||
TS int64 `json:"ts"` // 操作发生的毫秒时间戳
|
|
||||||
}
|
|
||||||
|
|
@ -1,82 +0,0 @@
|
||||||
// Package event define real time data evnet operation functions
|
|
||||||
package event
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"modelRT/common"
|
|
||||||
"modelRT/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
type actionHandler func(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error)
|
|
||||||
|
|
||||||
// actionDispatchMap define variable to store all action handler into map
|
|
||||||
var actionDispatchMap = map[string]actionHandler{
|
|
||||||
"info": handleInfoAction,
|
|
||||||
"warning": handleWarningAction,
|
|
||||||
"error": handleErrorAction,
|
|
||||||
"critical": handleCriticalAction,
|
|
||||||
"exception": handleExceptionAction,
|
|
||||||
}
|
|
||||||
|
|
||||||
// TriggerEventAction define func to trigger event by action in compute config
|
|
||||||
func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) (*EventRecord, error) {
|
|
||||||
handler, exists := actionDispatchMap[command]
|
|
||||||
if !exists {
|
|
||||||
logger.Error(ctx, "unknown action command", "command", command)
|
|
||||||
return nil, common.ErrUnknowEventActionCommand
|
|
||||||
}
|
|
||||||
|
|
||||||
eventRecord, err := handler(ctx, eventName, ops...)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "action event handler failed", "error", err)
|
|
||||||
return nil, common.ErrExecEventActionFailed
|
|
||||||
}
|
|
||||||
return eventRecord, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
|
|
||||||
logger.Info(ctx, "trigger info event", "event_name", eventName)
|
|
||||||
eventRecord, err := NewGeneralPlatformSoftRecord(eventName, ops...)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "generate info event record failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return eventRecord, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
|
|
||||||
logger.Info(ctx, "trigger warning event", "event_name", eventName)
|
|
||||||
eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "generate warning event record failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return eventRecord, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
|
|
||||||
logger.Info(ctx, "trigger error event", "event_name", eventName)
|
|
||||||
eventRecord, err := NewCriticalPlatformSoftRecord(eventName, ops...)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "generate error event record failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return eventRecord, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleCriticalAction(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) {
|
|
||||||
// 实际执行发送警告、记录日志等操作
|
|
||||||
actionParams := content
|
|
||||||
// ... logic to send critical level event using actionParams ...
|
|
||||||
logger.Warn(ctx, "trigger critical event", "message", actionParams)
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleExceptionAction(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) {
|
|
||||||
// 实际执行发送警告、记录日志等操作
|
|
||||||
actionParams := content
|
|
||||||
// ... logic to send except level event using actionParams ...
|
|
||||||
logger.Warn(ctx, "trigger except event", "message", actionParams)
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
@ -1,85 +0,0 @@
|
||||||
// Package event define real time data evnet operation functions
|
|
||||||
package event
|
|
||||||
|
|
||||||
import (
|
|
||||||
"maps"
|
|
||||||
"strings"
|
|
||||||
)
|
|
||||||
|
|
||||||
// EventOption define option function type for event record creation
|
|
||||||
type EventOption func(*EventRecord)
|
|
||||||
|
|
||||||
// WithCondition define option function to set event condition description
|
|
||||||
func WithCondition(cond map[string]any) EventOption {
|
|
||||||
return func(e *EventRecord) {
|
|
||||||
if cond != nil {
|
|
||||||
e.Condition = cond
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithSubscriptions define option function to set event attached subscription information
|
|
||||||
func WithSubscriptions(subs []any) EventOption {
|
|
||||||
return func(e *EventRecord) {
|
|
||||||
if subs != nil {
|
|
||||||
e.AttachedSubscriptions = subs
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithOperations define option function to set event operation records
|
|
||||||
func WithOperations(ops []OperationRecord) EventOption {
|
|
||||||
return func(e *EventRecord) {
|
|
||||||
if ops != nil {
|
|
||||||
e.Operations = ops
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithCategory define option function to set event category
|
|
||||||
func WithCategory(cat string) EventOption {
|
|
||||||
return func(e *EventRecord) {
|
|
||||||
e.Category = cat
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithResult define option function to set event analysis result
|
|
||||||
func WithResult(result map[string]any) EventOption {
|
|
||||||
return func(e *EventRecord) {
|
|
||||||
e.Result = result
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithTEAnalysisResult(breachType string) EventOption {
|
|
||||||
return func(e *EventRecord) {
|
|
||||||
if e.Result == nil {
|
|
||||||
e.Result = make(map[string]any)
|
|
||||||
}
|
|
||||||
|
|
||||||
description := "数据异常"
|
|
||||||
switch strings.ToLower(breachType) {
|
|
||||||
case "upup":
|
|
||||||
description = "超越上上限"
|
|
||||||
case "up":
|
|
||||||
description = "超越上限"
|
|
||||||
case "down":
|
|
||||||
description = "超越下限"
|
|
||||||
case "downdown":
|
|
||||||
description = "超越下下限"
|
|
||||||
}
|
|
||||||
|
|
||||||
e.Result["analysis_desc"] = description
|
|
||||||
e.Result["breach_type"] = breachType
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithConditionValue define option function to set event condition with real time value and extra data
|
|
||||||
func WithConditionValue(realTimeValue []float64, extraData map[string]any) EventOption {
|
|
||||||
return func(e *EventRecord) {
|
|
||||||
if e.Condition == nil {
|
|
||||||
e.Condition = make(map[string]any)
|
|
||||||
}
|
|
||||||
e.Condition["real_time_value"] = realTimeValue
|
|
||||||
maps.Copy(e.Condition, extraData)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,68 +0,0 @@
|
||||||
// Package event define real time data evnet operation functions
|
|
||||||
package event
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"modelRT/constants"
|
|
||||||
|
|
||||||
"github.com/gofrs/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewPlatformEventRecord define func to create a new platform event record with common fields initialized
|
|
||||||
func NewPlatformEventRecord(eventType int, priority int, eventName string, opts ...EventOption) (*EventRecord, error) {
|
|
||||||
u, err := uuid.NewV4()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to generate UUID: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
record := &EventRecord{
|
|
||||||
EventName: eventName,
|
|
||||||
EventUUID: u.String(),
|
|
||||||
Type: eventType,
|
|
||||||
Priority: priority,
|
|
||||||
Status: 1,
|
|
||||||
From: constants.EventFromPlatform,
|
|
||||||
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
|
||||||
Condition: make(map[string]any),
|
|
||||||
AttachedSubscriptions: make([]any, 0),
|
|
||||||
Operations: make([]OperationRecord, 0),
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(record)
|
|
||||||
}
|
|
||||||
|
|
||||||
return record, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewGeneralPlatformSoftRecord define func to create a new general platform software event record
|
|
||||||
func NewGeneralPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
|
||||||
return NewPlatformEventRecord(int(constants.EventGeneralPlatformSoft), 0, name, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewGeneralApplicationSoftRecord define func to create a new general application software event record
|
|
||||||
func NewGeneralApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
|
||||||
return NewPlatformEventRecord(int(constants.EventGeneralApplicationSoft), 0, name, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewWarnPlatformSoftRecord define func to create a new warning platform software event record
|
|
||||||
func NewWarnPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
|
||||||
return NewPlatformEventRecord(int(constants.EventWarnPlatformSoft), 3, name, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewWarnApplicationSoftRecord define func to create a new warning application software event record
|
|
||||||
func NewWarnApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
|
||||||
return NewPlatformEventRecord(int(constants.EventWarnApplicationSoft), 3, name, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewCriticalPlatformSoftRecord define func to create a new critical platform software event record
|
|
||||||
func NewCriticalPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
|
||||||
return NewPlatformEventRecord(int(constants.EventCriticalPlatformSoft), 6, name, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewCriticalApplicationSoftRecord define func to create a new critical application software event record
|
|
||||||
func NewCriticalApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
|
|
||||||
return NewPlatformEventRecord(int(constants.EventCriticalApplicationSoft), 6, name, opts...)
|
|
||||||
}
|
|
||||||
|
|
@ -1,146 +0,0 @@
|
||||||
// Package mq provides read or write access to message queue services
|
|
||||||
package mq
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"modelRT/constants"
|
|
||||||
"modelRT/logger"
|
|
||||||
"modelRT/mq/event"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
|
|
||||||
var MsgChan chan *event.EventRecord
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
MsgChan = make(chan *event.EventRecord, 10000)
|
|
||||||
}
|
|
||||||
|
|
||||||
func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
|
|
||||||
var channel *amqp.Channel
|
|
||||||
var err error
|
|
||||||
|
|
||||||
channel, err = GetConn().Channel()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "declare event dead letter exchange failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "declare event dead letter queue failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = channel.QueueBind(constants.EventUpDownDeadQueueName, "#", constants.EventDeadExchangeName, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "declare event exchange failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
args := amqp.Table{
|
|
||||||
"x-max-length": int32(50),
|
|
||||||
"x-dead-letter-exchange": constants.EventDeadExchangeName,
|
|
||||||
"x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey,
|
|
||||||
}
|
|
||||||
_, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "declare event queue failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "bind event queue with routing key and exchange failed", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := channel.Confirm(false); err != nil {
|
|
||||||
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return channel, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ
|
|
||||||
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.EventRecord) {
|
|
||||||
channel, err := initUpDownLimitEventChannel(ctx)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// TODO 使用配置修改确认模式通道参数
|
|
||||||
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100))
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case confirm, ok := <-confirms:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !confirm.Ack {
|
|
||||||
logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag)
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
|
|
||||||
channel.Close()
|
|
||||||
return
|
|
||||||
case eventRecord, ok := <-msgChan:
|
|
||||||
if !ok {
|
|
||||||
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
|
|
||||||
channel.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO 将消息的序列化移动到发送之前,以便使用eventRecord的category来作为routing key
|
|
||||||
recordBytes, err := json.Marshal(eventRecord)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// send event alarm message to rabbitMQ queue
|
|
||||||
routingKey := eventRecord.Category
|
|
||||||
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
err = channel.PublishWithContext(pubCtx,
|
|
||||||
constants.EventExchangeName, // exchange
|
|
||||||
routingKey, // routing key
|
|
||||||
false, // mandatory
|
|
||||||
false, // immediate
|
|
||||||
amqp.Publishing{
|
|
||||||
ContentType: "text/plain",
|
|
||||||
Body: recordBytes,
|
|
||||||
})
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", recordBytes, "error", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,217 +0,0 @@
|
||||||
// Package mq define message queue operation functions
|
|
||||||
package mq
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
|
||||||
"crypto/x509"
|
|
||||||
"encoding/pem"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"modelRT/config"
|
|
||||||
"modelRT/logger"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
"github.com/youmark/pkcs8"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
_globalRabbitMQProxy *RabbitMQProxy
|
|
||||||
rabbitMQOnce sync.Once
|
|
||||||
)
|
|
||||||
|
|
||||||
// RabbitMQProxy define stuct of rabbitMQ connection proxy
|
|
||||||
type RabbitMQProxy struct {
|
|
||||||
tlsConf *tls.Config
|
|
||||||
conn *amqp.Connection
|
|
||||||
cancel context.CancelFunc
|
|
||||||
mu sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// rabbitMQCertConf define stuct of rabbitMQ connection certificates config
|
|
||||||
type rabbitMQCertConf struct {
|
|
||||||
serverName string
|
|
||||||
insecureSkipVerify bool
|
|
||||||
clientCert tls.Certificate
|
|
||||||
caCertPool *x509.CertPool
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetConn define func to return the rabbitMQ connection
|
|
||||||
func GetConn() *amqp.Connection {
|
|
||||||
_globalRabbitMQProxy.mu.Lock()
|
|
||||||
defer _globalRabbitMQProxy.mu.Unlock()
|
|
||||||
return _globalRabbitMQProxy.conn
|
|
||||||
}
|
|
||||||
|
|
||||||
// InitRabbitProxy return instance of rabbitMQ connection
|
|
||||||
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
|
|
||||||
amqpURI := generateRabbitMQURI(rCfg)
|
|
||||||
tlsConf, err := initCertConf(rCfg)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "init rabbitMQ cert config failed", "error", err)
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
rabbitMQOnce.Do(func() {
|
|
||||||
cancelCtx, cancel := context.WithCancel(ctx)
|
|
||||||
conn := initRabbitMQ(ctx, amqpURI, tlsConf)
|
|
||||||
_globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel}
|
|
||||||
go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI)
|
|
||||||
})
|
|
||||||
return _globalRabbitMQProxy
|
|
||||||
}
|
|
||||||
|
|
||||||
// initRabbitMQ return instance of rabbitMQ connection
|
|
||||||
func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection {
|
|
||||||
logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI)
|
|
||||||
conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
|
|
||||||
TLSClientConfig: tlsConf,
|
|
||||||
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
|
|
||||||
Heartbeat: 10 * time.Second,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "init rabbitMQ connection failed", "error", err)
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return conn
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) {
|
|
||||||
for {
|
|
||||||
closeChan := make(chan *amqp.Error)
|
|
||||||
GetConn().NotifyClose(closeChan)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
logger.Info(ctx, "context cancelled, exiting handleReconnect")
|
|
||||||
return
|
|
||||||
case err, ok := <-closeChan:
|
|
||||||
if !ok {
|
|
||||||
logger.Info(ctx, "rabbitMQ notify channel closed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !p.reconnect(ctx, rabbitMQURI) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool {
|
|
||||||
for {
|
|
||||||
logger.Info(ctx, "attempting to reconnect to rabbitMQ...")
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
|
|
||||||
TLSClientConfig: p.tlsConf,
|
|
||||||
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
|
|
||||||
Heartbeat: 10 * time.Second,
|
|
||||||
})
|
|
||||||
if err == nil {
|
|
||||||
p.mu.Lock()
|
|
||||||
p.conn = newConn
|
|
||||||
p.mu.Unlock()
|
|
||||||
logger.Info(ctx, "rabbitMQ reconnected successfully")
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine
|
|
||||||
func CloseRabbitProxy() {
|
|
||||||
if _globalRabbitMQProxy != nil {
|
|
||||||
_globalRabbitMQProxy.cancel()
|
|
||||||
_globalRabbitMQProxy.mu.Lock()
|
|
||||||
if _globalRabbitMQProxy.conn != nil {
|
|
||||||
_globalRabbitMQProxy.conn.Close()
|
|
||||||
}
|
|
||||||
_globalRabbitMQProxy.mu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
|
|
||||||
// TODO 考虑拆分用户名密码配置项,兼容不同认证方式
|
|
||||||
// user := url.QueryEscape(rCfg.User)
|
|
||||||
// password := url.QueryEscape(rCfg.Password)
|
|
||||||
|
|
||||||
// amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/",
|
|
||||||
// user,
|
|
||||||
// password,
|
|
||||||
// rCfg.Host,
|
|
||||||
// rCfg.Port,
|
|
||||||
// )
|
|
||||||
amqpURI := fmt.Sprintf("amqps://%s:%d/",
|
|
||||||
rCfg.Host,
|
|
||||||
rCfg.Port,
|
|
||||||
)
|
|
||||||
return amqpURI
|
|
||||||
}
|
|
||||||
|
|
||||||
func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) {
|
|
||||||
tlsConf := &tls.Config{
|
|
||||||
InsecureSkipVerify: rCfg.InsecureSkipVerify,
|
|
||||||
ServerName: rCfg.ServerName,
|
|
||||||
}
|
|
||||||
|
|
||||||
caCert, err := os.ReadFile(rCfg.CACertPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("read server ca file failed: %w", err)
|
|
||||||
}
|
|
||||||
caCertPool := x509.NewCertPool()
|
|
||||||
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
|
|
||||||
return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath)
|
|
||||||
}
|
|
||||||
tlsConf.RootCAs = caCertPool
|
|
||||||
|
|
||||||
certPEM, err := os.ReadFile(rCfg.ClientCertPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("read client cert file failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
keyData, err := os.ReadFile(rCfg.ClientKeyPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("read private key file failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
block, _ := pem.Decode(keyData)
|
|
||||||
if block == nil {
|
|
||||||
return nil, fmt.Errorf("failed to decode PEM block from private key")
|
|
||||||
}
|
|
||||||
|
|
||||||
der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("parse password-protected private key failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
privBytes, err := x509.MarshalPKCS8PrivateKey(der)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshal private key failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
|
|
||||||
|
|
||||||
clientCert, err := tls.X509KeyPair(certPEM, keyPEM)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("create x509 key pair failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tlsConf.Certificates = []tls.Certificate{clientCert}
|
|
||||||
return tlsConf, nil
|
|
||||||
}
|
|
||||||
|
|
@ -1,95 +0,0 @@
|
||||||
// Package network define struct of network operation
|
|
||||||
package network
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gofrs/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AsyncTaskCreateRequest defines the request structure for creating an asynchronous task
|
|
||||||
type AsyncTaskCreateRequest struct {
|
|
||||||
// required: true
|
|
||||||
// enum: TOPOLOGY_ANALYSIS, PERFORMANCE_ANALYSIS, EVENT_ANALYSIS, BATCH_IMPORT
|
|
||||||
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"异步任务类型"`
|
|
||||||
// required: true
|
|
||||||
Params map[string]interface{} `json:"params" swaggertype:"object" description:"任务参数,根据任务类型不同而不同"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskCreateResponse defines the response structure for creating an asynchronous task
|
|
||||||
type AsyncTaskCreateResponse struct {
|
|
||||||
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskResultQueryRequest defines the request structure for querying task results
|
|
||||||
type AsyncTaskResultQueryRequest struct {
|
|
||||||
// required: true
|
|
||||||
TaskIDs []uuid.UUID `json:"task_ids" swaggertype:"array,string" example:"[\"123e4567-e89b-12d3-a456-426614174000\",\"223e4567-e89b-12d3-a456-426614174001\"]" description:"任务ID列表"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskResult defines the structure for a single task result
|
|
||||||
type AsyncTaskResult struct {
|
|
||||||
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
|
|
||||||
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"任务类型"`
|
|
||||||
Status string `json:"status" example:"COMPLETED" description:"任务状态:SUBMITTED, RUNNING, COMPLETED, FAILED"`
|
|
||||||
Progress *int `json:"progress,omitempty" example:"65" description:"任务进度(0-100),仅当状态为RUNNING时返回"`
|
|
||||||
CreatedAt int64 `json:"created_at" example:"1741846200" description:"任务创建时间戳"`
|
|
||||||
FinishedAt *int64 `json:"finished_at,omitempty" example:"1741846205" description:"任务完成时间戳,仅当状态为COMPLETED或FAILED时返回"`
|
|
||||||
Result map[string]interface{} `json:"result,omitempty" swaggertype:"object" description:"任务结果,仅当状态为COMPLETED时返回"`
|
|
||||||
ErrorCode *int `json:"error_code,omitempty" example:"400102" description:"错误码,仅当状态为FAILED时返回"`
|
|
||||||
ErrorMessage *string `json:"error_message,omitempty" example:"Component UUID not found" description:"错误信息,仅当状态为FAILED时返回"`
|
|
||||||
ErrorDetail map[string]interface{} `json:"error_detail,omitempty" swaggertype:"object" description:"错误详情,仅当状态为FAILED时返回"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskResultQueryResponse defines the response structure for querying task results
|
|
||||||
type AsyncTaskResultQueryResponse struct {
|
|
||||||
Total int `json:"total" example:"3" description:"查询的任务总数"`
|
|
||||||
Tasks []AsyncTaskResult `json:"tasks" description:"任务结果列表"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskProgressUpdate defines the structure for task progress update
|
|
||||||
type AsyncTaskProgressUpdate struct {
|
|
||||||
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
|
|
||||||
Progress int `json:"progress" example:"50" description:"任务进度(0-100)"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncTaskStatusUpdate defines the structure for task status update
|
|
||||||
type AsyncTaskStatusUpdate struct {
|
|
||||||
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
|
|
||||||
Status string `json:"status" example:"RUNNING" description:"任务状态:SUBMITTED, RUNNING, COMPLETED, FAILED"`
|
|
||||||
Timestamp int64 `json:"timestamp" example:"1741846205" description:"状态更新时间戳"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TopologyAnalysisParams defines the parameters for topology analysis task
|
|
||||||
type TopologyAnalysisParams struct {
|
|
||||||
StartUUID string `json:"start_uuid" example:"comp-001" description:"起始元件UUID"`
|
|
||||||
EndUUID string `json:"end_uuid" example:"comp-999" description:"目标元件UUID"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// PerformanceAnalysisParams defines the parameters for performance analysis task
|
|
||||||
type PerformanceAnalysisParams struct {
|
|
||||||
ComponentIDs []string `json:"component_ids" example:"[\"comp-001\",\"comp-002\"]" description:"需要分析的元件ID列表"`
|
|
||||||
TimeRange struct {
|
|
||||||
Start time.Time `json:"start" example:"2026-03-01T00:00:00Z" description:"分析开始时间"`
|
|
||||||
End time.Time `json:"end" example:"2026-03-02T00:00:00Z" description:"分析结束时间"`
|
|
||||||
} `json:"time_range" description:"分析时间范围"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventAnalysisParams defines the parameters for event analysis task
|
|
||||||
type EventAnalysisParams struct {
|
|
||||||
EventType string `json:"event_type" example:"MOTOR_START" description:"事件类型"`
|
|
||||||
StartTime time.Time `json:"start_time" example:"2026-03-01T00:00:00Z" description:"事件开始时间"`
|
|
||||||
EndTime time.Time `json:"end_time" example:"2026-03-02T00:00:00Z" description:"事件结束时间"`
|
|
||||||
Components []string `json:"components,omitempty" example:"[\"comp-001\",\"comp-002\"]" description:"关联的元件列表"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// BatchImportParams defines the parameters for batch import task
|
|
||||||
type BatchImportParams struct {
|
|
||||||
FilePath string `json:"file_path" example:"/data/import/model.csv" description:"导入文件路径"`
|
|
||||||
FileType string `json:"file_type" example:"CSV" description:"文件类型:CSV, JSON, XML"`
|
|
||||||
Options struct {
|
|
||||||
Overwrite bool `json:"overwrite" example:"false" description:"是否覆盖现有数据"`
|
|
||||||
Validate bool `json:"validate" example:"true" description:"是否进行数据验证"`
|
|
||||||
NotifyUser bool `json:"notify_user" example:"true" description:"是否通知用户"`
|
|
||||||
} `json:"options" description:"导入选项"`
|
|
||||||
}
|
|
||||||
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"modelRT/common"
|
|
||||||
"modelRT/common/errcode"
|
"modelRT/common/errcode"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/orm"
|
"modelRT/orm"
|
||||||
|
|
@ -65,10 +64,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
|
||||||
switch info.ChangeType {
|
switch info.ChangeType {
|
||||||
case constants.UUIDFromChangeType:
|
case constants.UUIDFromChangeType:
|
||||||
if info.NewUUIDFrom == info.OldUUIDFrom {
|
if info.NewUUIDFrom == info.OldUUIDFrom {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT1)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT1)
|
||||||
}
|
}
|
||||||
if info.NewUUIDTo != info.OldUUIDTo {
|
if info.NewUUIDTo != info.OldUUIDTo {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT1)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT1)
|
||||||
}
|
}
|
||||||
|
|
||||||
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
|
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
|
||||||
|
|
@ -91,10 +90,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
|
||||||
UUIDChangeInfo.NewUUIDTo = OldUUIDTo
|
UUIDChangeInfo.NewUUIDTo = OldUUIDTo
|
||||||
case constants.UUIDToChangeType:
|
case constants.UUIDToChangeType:
|
||||||
if info.NewUUIDFrom != info.OldUUIDFrom {
|
if info.NewUUIDFrom != info.OldUUIDFrom {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT2)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT2)
|
||||||
}
|
}
|
||||||
if info.NewUUIDTo == info.OldUUIDTo {
|
if info.NewUUIDTo == info.OldUUIDTo {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT2)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT2)
|
||||||
}
|
}
|
||||||
|
|
||||||
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
|
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
|
||||||
|
|
@ -117,10 +116,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
|
||||||
UUIDChangeInfo.NewUUIDTo = newUUIDTo
|
UUIDChangeInfo.NewUUIDTo = newUUIDTo
|
||||||
case constants.UUIDAddChangeType:
|
case constants.UUIDAddChangeType:
|
||||||
if info.OldUUIDFrom != "" {
|
if info.OldUUIDFrom != "" {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT3)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT3)
|
||||||
}
|
}
|
||||||
if info.OldUUIDTo != "" {
|
if info.OldUUIDTo != "" {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT3)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT3)
|
||||||
}
|
}
|
||||||
|
|
||||||
newUUIDFrom, err := uuid.FromString(info.NewUUIDFrom)
|
newUUIDFrom, err := uuid.FromString(info.NewUUIDFrom)
|
||||||
|
|
|
||||||
|
|
@ -3,25 +3,18 @@ package network
|
||||||
|
|
||||||
// FailureResponse define struct of standard failure API response format
|
// FailureResponse define struct of standard failure API response format
|
||||||
type FailureResponse struct {
|
type FailureResponse struct {
|
||||||
Code int `json:"code" example:"3000"`
|
Code int `json:"code" example:"500"`
|
||||||
Msg string `json:"msg" example:"process completed with partial failures"`
|
Msg string `json:"msg" example:"failed to get recommend data from redis"`
|
||||||
Payload any `json:"payload" swaggertype:"object"`
|
Payload any `json:"payload" swaggertype:"object"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SuccessResponse define struct of standard successful API response format
|
// SuccessResponse define struct of standard successful API response format
|
||||||
type SuccessResponse struct {
|
type SuccessResponse struct {
|
||||||
Code int `json:"code" example:"2000"`
|
Code int `json:"code" example:"200"`
|
||||||
Msg string `json:"msg" example:"process completed"`
|
Msg string `json:"msg" example:"success"`
|
||||||
Payload any `json:"payload" swaggertype:"object"`
|
Payload any `json:"payload" swaggertype:"object"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// WSResponse define struct of standard websocket API response format
|
|
||||||
type WSResponse struct {
|
|
||||||
Code int `json:"code" example:"2000"`
|
|
||||||
Msg string `json:"msg" example:"process completed"`
|
|
||||||
Payload any `json:"payload,omitempty" swaggertype:"object"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response.
|
// MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response.
|
||||||
type MeasurementRecommendPayload struct {
|
type MeasurementRecommendPayload struct {
|
||||||
Input string `json:"input" example:"transformfeeder1_220."`
|
Input string `json:"input" example:"transformfeeder1_220."`
|
||||||
|
|
@ -33,7 +26,7 @@ type MeasurementRecommendPayload struct {
|
||||||
// TargetResult define struct of target item in real time data subscription response payload
|
// TargetResult define struct of target item in real time data subscription response payload
|
||||||
type TargetResult struct {
|
type TargetResult struct {
|
||||||
ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"`
|
ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"`
|
||||||
Code int `json:"code" example:"20000"`
|
Code string `json:"code" example:"1001"`
|
||||||
Msg string `json:"msg" example:"subscription success"`
|
Msg string `json:"msg" example:"subscription success"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,115 +0,0 @@
|
||||||
// Package orm define database data struct
|
|
||||||
package orm
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/gofrs/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AsyncTaskType defines the type of asynchronous task
|
|
||||||
type AsyncTaskType string
|
|
||||||
|
|
||||||
const (
|
|
||||||
// AsyncTaskTypeTopologyAnalysis represents topology analysis task
|
|
||||||
AsyncTaskTypeTopologyAnalysis AsyncTaskType = "TOPOLOGY_ANALYSIS"
|
|
||||||
// AsyncTaskTypePerformanceAnalysis represents performance analysis task
|
|
||||||
AsyncTaskTypePerformanceAnalysis AsyncTaskType = "PERFORMANCE_ANALYSIS"
|
|
||||||
// AsyncTaskTypeEventAnalysis represents event analysis task
|
|
||||||
AsyncTaskTypeEventAnalysis AsyncTaskType = "EVENT_ANALYSIS"
|
|
||||||
// AsyncTaskTypeBatchImport represents batch import task
|
|
||||||
AsyncTaskTypeBatchImport AsyncTaskType = "BATCH_IMPORT"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AsyncTaskStatus defines the status of asynchronous task
|
|
||||||
type AsyncTaskStatus string
|
|
||||||
|
|
||||||
const (
|
|
||||||
// AsyncTaskStatusSubmitted represents task has been submitted to queue
|
|
||||||
AsyncTaskStatusSubmitted AsyncTaskStatus = "SUBMITTED"
|
|
||||||
// AsyncTaskStatusRunning represents task is currently executing
|
|
||||||
AsyncTaskStatusRunning AsyncTaskStatus = "RUNNING"
|
|
||||||
// AsyncTaskStatusCompleted represents task completed successfully
|
|
||||||
AsyncTaskStatusCompleted AsyncTaskStatus = "COMPLETED"
|
|
||||||
// AsyncTaskStatusFailed represents task failed with error
|
|
||||||
AsyncTaskStatusFailed AsyncTaskStatus = "FAILED"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AsyncTask defines the core task entity stored in database for task lifecycle tracking
|
|
||||||
type AsyncTask struct {
|
|
||||||
TaskID uuid.UUID `gorm:"column:task_id;primaryKey;type:uuid;default:gen_random_uuid()"`
|
|
||||||
TaskType AsyncTaskType `gorm:"column:task_type;type:varchar(50);not null"`
|
|
||||||
Status AsyncTaskStatus `gorm:"column:status;type:varchar(20);not null;index"`
|
|
||||||
Params JSONMap `gorm:"column:params;type:jsonb"`
|
|
||||||
CreatedAt int64 `gorm:"column:created_at;not null;index"`
|
|
||||||
FinishedAt *int64 `gorm:"column:finished_at;index"`
|
|
||||||
Progress *int `gorm:"column:progress"` // 0-100, nullable
|
|
||||||
}
|
|
||||||
|
|
||||||
// TableName returns the table name for AsyncTask model
|
|
||||||
func (a *AsyncTask) TableName() string {
|
|
||||||
return "async_task"
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetSubmitted marks the task as submitted
|
|
||||||
func (a *AsyncTask) SetSubmitted() {
|
|
||||||
a.Status = AsyncTaskStatusSubmitted
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetRunning marks the task as running
|
|
||||||
func (a *AsyncTask) SetRunning() {
|
|
||||||
a.Status = AsyncTaskStatusRunning
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetCompleted marks the task as completed with finished timestamp
|
|
||||||
func (a *AsyncTask) SetCompleted(timestamp int64) {
|
|
||||||
a.Status = AsyncTaskStatusCompleted
|
|
||||||
a.FinishedAt = ×tamp
|
|
||||||
a.setProgress(100)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetFailed marks the task as failed with finished timestamp
|
|
||||||
func (a *AsyncTask) SetFailed(timestamp int64) {
|
|
||||||
a.Status = AsyncTaskStatusFailed
|
|
||||||
a.FinishedAt = ×tamp
|
|
||||||
}
|
|
||||||
|
|
||||||
// setProgress updates the task progress (0-100)
|
|
||||||
func (a *AsyncTask) setProgress(value int) {
|
|
||||||
if value < 0 {
|
|
||||||
value = 0
|
|
||||||
}
|
|
||||||
if value > 100 {
|
|
||||||
value = 100
|
|
||||||
}
|
|
||||||
a.Progress = &value
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateProgress updates the task progress with validation
|
|
||||||
func (a *AsyncTask) UpdateProgress(value int) {
|
|
||||||
a.setProgress(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsCompleted checks if the task is completed
|
|
||||||
func (a *AsyncTask) IsCompleted() bool {
|
|
||||||
return a.Status == AsyncTaskStatusCompleted
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsRunning checks if the task is running
|
|
||||||
func (a *AsyncTask) IsRunning() bool {
|
|
||||||
return a.Status == AsyncTaskStatusRunning
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsFailed checks if the task failed
|
|
||||||
func (a *AsyncTask) IsFailed() bool {
|
|
||||||
return a.Status == AsyncTaskStatusFailed
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsValidTaskType checks if the task type is valid
|
|
||||||
func IsValidAsyncTaskType(taskType string) bool {
|
|
||||||
switch AsyncTaskType(taskType) {
|
|
||||||
case AsyncTaskTypeTopologyAnalysis, AsyncTaskTypePerformanceAnalysis,
|
|
||||||
AsyncTaskTypeEventAnalysis, AsyncTaskTypeBatchImport:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,70 +0,0 @@
|
||||||
// Package orm define database data struct
|
|
||||||
package orm
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/gofrs/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AsyncTaskResult stores computation results, separate from AsyncTask model for flexibility
|
|
||||||
type AsyncTaskResult struct {
|
|
||||||
TaskID uuid.UUID `gorm:"column:task_id;primaryKey;type:uuid"`
|
|
||||||
Result JSONMap `gorm:"column:result;type:jsonb"`
|
|
||||||
ErrorCode *int `gorm:"column:error_code"`
|
|
||||||
ErrorMessage *string `gorm:"column:error_message;type:text"`
|
|
||||||
ErrorDetail JSONMap `gorm:"column:error_detail;type:jsonb"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TableName returns the table name for AsyncTaskResult model
|
|
||||||
func (a *AsyncTaskResult) TableName() string {
|
|
||||||
return "async_task_result"
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetSuccess sets the result for successful task execution
|
|
||||||
func (a *AsyncTaskResult) SetSuccess(result JSONMap) {
|
|
||||||
a.Result = result
|
|
||||||
a.ErrorCode = nil
|
|
||||||
a.ErrorMessage = nil
|
|
||||||
a.ErrorDetail = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetError sets the error information for failed task execution
|
|
||||||
func (a *AsyncTaskResult) SetError(code int, message string, detail JSONMap) {
|
|
||||||
a.Result = nil
|
|
||||||
a.ErrorCode = &code
|
|
||||||
a.ErrorMessage = &message
|
|
||||||
a.ErrorDetail = detail
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasError checks if the task result contains an error
|
|
||||||
func (a *AsyncTaskResult) HasError() bool {
|
|
||||||
return a.ErrorCode != nil || a.ErrorMessage != nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetErrorCode returns the error code or 0 if no error
|
|
||||||
func (a *AsyncTaskResult) GetErrorCode() int {
|
|
||||||
if a.ErrorCode == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return *a.ErrorCode
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetErrorMessage returns the error message or empty string if no error
|
|
||||||
func (a *AsyncTaskResult) GetErrorMessage() string {
|
|
||||||
if a.ErrorMessage == nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return *a.ErrorMessage
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsSuccess checks if the task execution was successful
|
|
||||||
func (a *AsyncTaskResult) IsSuccess() bool {
|
|
||||||
return !a.HasError()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear clears all result data
|
|
||||||
func (a *AsyncTaskResult) Clear() {
|
|
||||||
a.Result = nil
|
|
||||||
a.ErrorCode = nil
|
|
||||||
a.ErrorMessage = nil
|
|
||||||
a.ErrorDetail = nil
|
|
||||||
}
|
|
||||||
|
|
@ -5,11 +5,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"modelRT/alert"
|
||||||
"modelRT/config"
|
"modelRT/config"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/real-time-data/alert"
|
|
||||||
|
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -9,8 +9,7 @@ import (
|
||||||
|
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/mq"
|
"modelRT/real-time-data/event"
|
||||||
"modelRT/mq/event"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
|
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
|
||||||
|
|
@ -27,13 +26,6 @@ type teEventThresholds struct {
|
||||||
isFloatCause bool
|
isFloatCause bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type teBreachTrigger struct {
|
|
||||||
breachType string
|
|
||||||
triggered bool
|
|
||||||
triggeredValues []float64
|
|
||||||
eventOpts []event.EventOption
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseTEThresholds define func to parse telemetry thresholds by casue map
|
// parseTEThresholds define func to parse telemetry thresholds by casue map
|
||||||
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
|
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
|
||||||
t := teEventThresholds{}
|
t := teEventThresholds{}
|
||||||
|
|
@ -92,74 +84,60 @@ func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeCo
|
||||||
// analyzeTEDataLogic define func to processing telemetry data and event triggering
|
// analyzeTEDataLogic define func to processing telemetry data and event triggering
|
||||||
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
|
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
|
||||||
windowSize := conf.minBreachCount
|
windowSize := conf.minBreachCount
|
||||||
dataLen := len(realTimeValues)
|
if windowSize <= 0 {
|
||||||
if dataLen < windowSize || windowSize <= 0 {
|
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
statusArray := make([]string, dataLen)
|
// mark whether any events have been triggered in this batch
|
||||||
for i, val := range realTimeValues {
|
var eventTriggered bool
|
||||||
statusArray[i] = getTEBreachType(val, thresholds)
|
breachTriggers := map[string]bool{
|
||||||
|
"up": false, "upup": false, "down": false, "downdown": false,
|
||||||
}
|
}
|
||||||
|
|
||||||
breachTriggers := make(map[string]teBreachTrigger)
|
// implement slide window to determine breach counts
|
||||||
for i := 0; i <= dataLen-windowSize; i++ {
|
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
|
||||||
firstBreachType := statusArray[i]
|
window := realTimeValues[i : i+windowSize]
|
||||||
|
firstValueBreachType := getTEBreachType(window[0], thresholds)
|
||||||
|
|
||||||
// if the first value in the window does not breach, skip this window directly
|
if firstValueBreachType == "" {
|
||||||
if firstBreachType == "" {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
allMatch := true
|
allMatch := true
|
||||||
for j := 1; j < windowSize; j++ {
|
for j := 1; j < windowSize; j++ {
|
||||||
if statusArray[i+j] != firstBreachType {
|
currentValueBreachType := getTEBreachType(window[j], thresholds)
|
||||||
|
if currentValueBreachType != firstValueBreachType {
|
||||||
allMatch = false
|
allMatch = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if allMatch {
|
if allMatch {
|
||||||
triggerValues := realTimeValues[i : i+windowSize]
|
|
||||||
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
|
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
|
||||||
_, exists := breachTriggers[firstBreachType]
|
if !breachTriggers[firstValueBreachType] {
|
||||||
if !exists {
|
// trigger event
|
||||||
logger.Warn(ctx, "event triggered by sliding window",
|
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
|
||||||
"breach_type", firstBreachType,
|
|
||||||
"trigger_values", triggerValues)
|
|
||||||
|
|
||||||
// build Options
|
breachTriggers[firstValueBreachType] = true
|
||||||
opts := []event.EventOption{
|
eventTriggered = true
|
||||||
event.WithConditionValue(triggerValues, conf.Cause),
|
|
||||||
event.WithTEAnalysisResult(firstBreachType),
|
|
||||||
event.WithCategory(constants.EventWarnUpDownLimitCategroy),
|
|
||||||
// TODO 生成 operations并考虑如何放入 event 中
|
|
||||||
// event.WithOperations(nil)
|
|
||||||
}
|
|
||||||
breachTriggers[firstBreachType] = teBreachTrigger{
|
|
||||||
breachType: firstBreachType,
|
|
||||||
triggered: false,
|
|
||||||
triggeredValues: triggerValues,
|
|
||||||
eventOpts: opts,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for breachType, trigger := range breachTriggers {
|
if eventTriggered {
|
||||||
// trigger Action
|
command, content := genTEEventCommandAndContent(ctx, conf.Action)
|
||||||
command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action)
|
// TODO 考虑 content 是否可以为空,先期不允许
|
||||||
eventName := fmt.Sprintf("telemetry_%s_%s_Breach_Event", mainBody, breachType)
|
if command == "" || content == "" {
|
||||||
eventRecord, err := event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...)
|
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "trigger event action failed", "error", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
mq.MsgChan <- eventRecord
|
event.TriggerEventAction(ctx, command, content)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func genTEEventCommandAndMainBody(ctx context.Context, action map[string]any) (command string, mainBody string) {
|
func genTEEventCommandAndContent(ctx context.Context, action map[string]any) (command string, content string) {
|
||||||
cmdValue, exist := action["command"]
|
cmdValue, exist := action["command"]
|
||||||
if !exist {
|
if !exist {
|
||||||
logger.Error(ctx, "can not find command variable into action map", "action", action)
|
logger.Error(ctx, "can not find command variable into action map", "action", action)
|
||||||
|
|
@ -207,7 +185,7 @@ type tiEventThresholds struct {
|
||||||
isFloatCause bool
|
isFloatCause bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseTIThresholds define func to parse telesignal thresholds by casue map
|
// parseTEThresholds define func to parse telesignal thresholds by casue map
|
||||||
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
||||||
edgeKey := "edge"
|
edgeKey := "edge"
|
||||||
t := tiEventThresholds{
|
t := tiEventThresholds{
|
||||||
|
|
@ -233,12 +211,11 @@ func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
||||||
|
|
||||||
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
|
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
|
||||||
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
|
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
|
||||||
switch t.edge {
|
if t.edge == constants.TelesignalRaising {
|
||||||
case constants.TelesignalRaising:
|
|
||||||
if previousValue == 0.0 && currentValue == 1.0 {
|
if previousValue == 0.0 && currentValue == 1.0 {
|
||||||
return constants.TIBreachTriggerType
|
return constants.TIBreachTriggerType
|
||||||
}
|
}
|
||||||
case constants.TelesignalFalling:
|
} else if t.edge == constants.TelesignalFalling {
|
||||||
if previousValue == 1.0 && currentValue == 0.0 {
|
if previousValue == 1.0 && currentValue == 0.0 {
|
||||||
return constants.TIBreachTriggerType
|
return constants.TIBreachTriggerType
|
||||||
}
|
}
|
||||||
|
|
@ -320,22 +297,18 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE
|
||||||
}
|
}
|
||||||
|
|
||||||
if eventTriggered {
|
if eventTriggered {
|
||||||
command, mainBody := genTIEventCommandAndMainBody(conf.Action)
|
command, content := genTIEventCommandAndContent(conf.Action)
|
||||||
if command == "" || mainBody == "" {
|
// TODO 考虑 content 是否可以为空,先期不允许
|
||||||
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "main_body", mainBody)
|
if command == "" || content == "" {
|
||||||
|
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
eventRecord, err := event.TriggerEventAction(ctx, command, mainBody)
|
event.TriggerEventAction(ctx, command, content)
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "trigger event action failed", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
mq.MsgChan <- eventRecord
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func genTIEventCommandAndMainBody(action map[string]any) (command string, mainBody string) {
|
func genTIEventCommandAndContent(action map[string]any) (command string, content string) {
|
||||||
cmdValue, exist := action["command"]
|
cmdValue, exist := action["command"]
|
||||||
if !exist {
|
if !exist {
|
||||||
return "", ""
|
return "", ""
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,74 @@
|
||||||
|
// Package event define real time data evnet operation functions
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"modelRT/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
type actionHandler func(ctx context.Context, content string) error
|
||||||
|
|
||||||
|
// actionDispatchMap define variable to store all action handler into map
|
||||||
|
var actionDispatchMap = map[string]actionHandler{
|
||||||
|
"info": handleInfoAction,
|
||||||
|
"warning": handleWarningAction,
|
||||||
|
"error": handleErrorAction,
|
||||||
|
"critical": handleCriticalAction,
|
||||||
|
"exception": handleExceptionAction,
|
||||||
|
}
|
||||||
|
|
||||||
|
// TriggerEventAction define func to trigger event by action in compute config
|
||||||
|
func TriggerEventAction(ctx context.Context, command string, content string) {
|
||||||
|
handler, exists := actionDispatchMap[command]
|
||||||
|
if !exists {
|
||||||
|
logger.Error(ctx, "unknown action command", "command", command)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := handler(ctx, content)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Info(ctx, "action handler success", "command", command, "content", content)
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleInfoAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send info level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger info event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleWarningAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send warning level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger warning event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleErrorAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send error level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger error event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleCriticalAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send critical level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger critical event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleExceptionAction(ctx context.Context, content string) error {
|
||||||
|
// 实际执行发送警告、记录日志等操作
|
||||||
|
actionParams := content
|
||||||
|
// ... logic to send except level event using actionParams ...
|
||||||
|
logger.Warn(ctx, "trigger except event", "message", actionParams)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,400 @@
|
||||||
|
// Package realtimedata define real time data operation functions
|
||||||
|
package realtimedata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"modelRT/constants"
|
||||||
|
"modelRT/diagram"
|
||||||
|
"modelRT/logger"
|
||||||
|
"modelRT/model"
|
||||||
|
"modelRT/network"
|
||||||
|
"modelRT/orm"
|
||||||
|
"modelRT/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// RealTimeDataChan define channel of real time data receive
|
||||||
|
RealTimeDataChan chan network.RealTimeDataReceiveRequest
|
||||||
|
globalComputeState *MeasComputeState
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
|
||||||
|
globalComputeState = NewMeasComputeState()
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info
|
||||||
|
func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) {
|
||||||
|
for _, measurement := range measurements {
|
||||||
|
enableValue, exist := measurement.EventPlan["enable"]
|
||||||
|
enable, ok := enableValue.(bool)
|
||||||
|
if !exist || !enable {
|
||||||
|
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
conf, err := initComputeConfig(measurement)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf == nil {
|
||||||
|
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
uuidStr := measurement.ComponentUUID.String()
|
||||||
|
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
|
||||||
|
conf.StopGchan = make(chan struct{})
|
||||||
|
globalComputeState.Store(uuidStr, conf)
|
||||||
|
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID)
|
||||||
|
go continuousComputation(enrichedCtx, conf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
enableValue, exist := measurement.EventPlan["enable"]
|
||||||
|
enable, ok := enableValue.(bool)
|
||||||
|
if !exist {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !enable {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
conf := &ComputeConfig{}
|
||||||
|
|
||||||
|
causeValue, exist := measurement.EventPlan["cause"]
|
||||||
|
if !exist {
|
||||||
|
return nil, errors.New("missing required field cause")
|
||||||
|
}
|
||||||
|
|
||||||
|
cause, ok := causeValue.(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
|
||||||
|
}
|
||||||
|
conf.Cause, err = processCauseMap(cause)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actionValue, exist := measurement.EventPlan["action"]
|
||||||
|
if !exist {
|
||||||
|
return nil, errors.New("missing required field action")
|
||||||
|
}
|
||||||
|
action, ok := actionValue.(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
|
||||||
|
}
|
||||||
|
conf.Action = action
|
||||||
|
|
||||||
|
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
|
||||||
|
}
|
||||||
|
conf.QueryKey = queryKey
|
||||||
|
conf.DataSize = int64(measurement.Size)
|
||||||
|
// TODO use constant values for temporary settings
|
||||||
|
conf.minBreachCount = constants.MinBreachCount
|
||||||
|
// TODO 后续优化 duration 创建方式
|
||||||
|
conf.Duration = 10
|
||||||
|
|
||||||
|
isFloatCause := false
|
||||||
|
if _, exists := conf.Cause["up"]; exists {
|
||||||
|
isFloatCause = true
|
||||||
|
} else if _, exists := conf.Cause["down"]; exists {
|
||||||
|
isFloatCause = true
|
||||||
|
} else if _, exists := conf.Cause["upup"]; exists {
|
||||||
|
isFloatCause = true
|
||||||
|
} else if _, exists := conf.Cause["downdown"]; exists {
|
||||||
|
isFloatCause = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if isFloatCause {
|
||||||
|
// te config
|
||||||
|
teThresholds, err := parseTEThresholds(conf.Cause)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
|
||||||
|
}
|
||||||
|
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
|
||||||
|
} else {
|
||||||
|
// ti config
|
||||||
|
tiThresholds, err := parseTIThresholds(conf.Cause)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
|
||||||
|
}
|
||||||
|
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
|
||||||
|
}
|
||||||
|
|
||||||
|
return conf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func processCauseMap(data map[string]any) (map[string]any, error) {
|
||||||
|
causeResult := make(map[string]any)
|
||||||
|
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
||||||
|
|
||||||
|
var foundFloatKey bool
|
||||||
|
for _, key := range keysToExtract {
|
||||||
|
if value, exists := data[key]; exists {
|
||||||
|
|
||||||
|
foundFloatKey = true
|
||||||
|
|
||||||
|
// check value type
|
||||||
|
if floatVal, ok := value.(float64); ok {
|
||||||
|
causeResult[key] = floatVal
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if foundFloatKey == true {
|
||||||
|
return causeResult, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
edgeKey := "edge"
|
||||||
|
if value, exists := data[edgeKey]; exists {
|
||||||
|
if stringVal, ok := value.(string); ok {
|
||||||
|
switch stringVal {
|
||||||
|
case "raising":
|
||||||
|
fallthrough
|
||||||
|
case "falling":
|
||||||
|
causeResult[edgeKey] = stringVal
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
||||||
|
client := diagram.NewRedisClient()
|
||||||
|
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
|
||||||
|
duration := util.SecondsToDuration(conf.Duration)
|
||||||
|
ticker := time.NewTicker(duration)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-conf.StopGchan:
|
||||||
|
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
realTimedatas := util.ConvertZSetMembersToFloat64(members)
|
||||||
|
if conf.Analyzer != nil {
|
||||||
|
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
||||||
|
} else {
|
||||||
|
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// // ReceiveChan define func to real time data receive and process
|
||||||
|
// func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) {
|
||||||
|
// consumer, err := kafka.NewConsumer(consumerConfig)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Error(ctx, "create kafka consumer failed", "error", err)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
// defer consumer.Close()
|
||||||
|
|
||||||
|
// err = consumer.SubscribeTopics(topics, nil)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Error(ctx, "subscribe kafka topics failed", "topic", topics, "error", err)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// batchSize := 100
|
||||||
|
// batchTimeout := util.SecondsToDuration(duration)
|
||||||
|
// messages := make([]*kafka.Message, 0, batchSize)
|
||||||
|
// lastCommit := time.Now()
|
||||||
|
// logger.Info(ctx, "start consuming from kafka", "topic", topics)
|
||||||
|
// for {
|
||||||
|
// select {
|
||||||
|
// case <-ctx.Done():
|
||||||
|
// logger.Info(ctx, "stop real time data computing by context cancel")
|
||||||
|
// return
|
||||||
|
// case realTimeData := <-RealTimeDataChan:
|
||||||
|
// componentUUID := realTimeData.PayLoad.ComponentUUID
|
||||||
|
// component, err := diagram.GetComponentMap(componentUUID)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
|
||||||
|
// componentType := component.Type
|
||||||
|
// if componentType != constants.DemoType {
|
||||||
|
// logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var anchorName string
|
||||||
|
// var compareValUpperLimit, compareValLowerLimit float64
|
||||||
|
// var anchorRealTimeData []float64
|
||||||
|
// var calculateFunc func(archorValue float64, args ...float64) float64
|
||||||
|
|
||||||
|
// // calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData)
|
||||||
|
|
||||||
|
// for _, param := range realTimeData.PayLoad.Values {
|
||||||
|
// anchorRealTimeData = append(anchorRealTimeData, param.Value)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// anchorConfig := config.AnchorParamConfig{
|
||||||
|
// AnchorParamBaseConfig: config.AnchorParamBaseConfig{
|
||||||
|
// ComponentUUID: componentUUID,
|
||||||
|
// AnchorName: anchorName,
|
||||||
|
// CompareValUpperLimit: compareValUpperLimit,
|
||||||
|
// CompareValLowerLimit: compareValLowerLimit,
|
||||||
|
// AnchorRealTimeData: anchorRealTimeData,
|
||||||
|
// },
|
||||||
|
// CalculateFunc: calculateFunc,
|
||||||
|
// CalculateParams: []float64{},
|
||||||
|
// }
|
||||||
|
// anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// anchorChan <- anchorConfig
|
||||||
|
// default:
|
||||||
|
// msg, err := consumer.ReadMessage(batchTimeout)
|
||||||
|
// if err != nil {
|
||||||
|
// if err.(kafka.Error).Code() == kafka.ErrTimedOut {
|
||||||
|
// // process accumulated messages when timeout
|
||||||
|
// if len(messages) > 0 {
|
||||||
|
// processMessageBatch(ctx, messages)
|
||||||
|
// consumer.Commit()
|
||||||
|
// messages = messages[:0]
|
||||||
|
// }
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// logger.Error(ctx, "read message from kafka failed", "error", err, "msg", msg)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
|
||||||
|
// messages = append(messages, msg)
|
||||||
|
// // process messages when batch size or timeout period is reached
|
||||||
|
// if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout {
|
||||||
|
// processMessageBatch(ctx, messages)
|
||||||
|
// consumer.Commit()
|
||||||
|
// messages = messages[:0]
|
||||||
|
// lastCommit = time.Now()
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// type realTimeDataPayload struct {
|
||||||
|
// ComponentUUID string
|
||||||
|
// Values []float64
|
||||||
|
// }
|
||||||
|
|
||||||
|
// type realTimeData struct {
|
||||||
|
// Payload realTimeDataPayload
|
||||||
|
// }
|
||||||
|
|
||||||
|
// func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
|
||||||
|
// var realTimeData realTimeData
|
||||||
|
// err := json.Unmarshal(msgValue, &realTimeData)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
||||||
|
// }
|
||||||
|
// return &realTimeData, nil
|
||||||
|
// }
|
||||||
|
|
||||||
|
// func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
|
||||||
|
// componentUUID := realTimeData.Payload.ComponentUUID
|
||||||
|
// component, err := diagram.GetComponentMap(componentUUID)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Error(ctx, "query component info from diagram map by component id failed",
|
||||||
|
// "component_uuid", componentUUID, "error", err)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// componentType := component.Type
|
||||||
|
// if componentType != constants.DemoType {
|
||||||
|
// logger.Error(ctx, "can not process real time data of component type not equal DemoType",
|
||||||
|
// "component_uuid", componentUUID)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var anchorName string
|
||||||
|
// var compareValUpperLimit, compareValLowerLimit float64
|
||||||
|
// var anchorRealTimeData []float64
|
||||||
|
// var calculateFunc func(archorValue float64, args ...float64) float64
|
||||||
|
|
||||||
|
// for _, param := range realTimeData.Payload.Values {
|
||||||
|
// anchorRealTimeData = append(anchorRealTimeData, param)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// anchorConfig := config.AnchorParamConfig{
|
||||||
|
// AnchorParamBaseConfig: config.AnchorParamBaseConfig{
|
||||||
|
// ComponentUUID: componentUUID,
|
||||||
|
// AnchorName: anchorName,
|
||||||
|
// CompareValUpperLimit: compareValUpperLimit,
|
||||||
|
// CompareValLowerLimit: compareValLowerLimit,
|
||||||
|
// AnchorRealTimeData: anchorRealTimeData,
|
||||||
|
// },
|
||||||
|
// CalculateFunc: calculateFunc,
|
||||||
|
// CalculateParams: []float64{},
|
||||||
|
// }
|
||||||
|
|
||||||
|
// anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Error(ctx, "get anchor param chan failed",
|
||||||
|
// "component_uuid", componentUUID, "error", err)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// select {
|
||||||
|
// case anchorChan <- anchorConfig:
|
||||||
|
// case <-ctx.Done():
|
||||||
|
// logger.Info(ctx, "context done while sending to anchor chan")
|
||||||
|
// case <-time.After(5 * time.Second):
|
||||||
|
// logger.Error(ctx, "timeout sending to anchor chan", "component_uuid", componentUUID)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // processMessageBatch define func to bathc process kafka message
|
||||||
|
// func processMessageBatch(ctx context.Context, messages []*kafka.Message) {
|
||||||
|
// for _, msg := range messages {
|
||||||
|
// realTimeData, err := parseKafkaMessage(msg.Value)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Error(ctx, "parse kafka message failed", "error", err, "msg", msg)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// go processRealTimeData(ctx, realTimeData)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
@ -1,229 +0,0 @@
|
||||||
// Package realtimedata define real time data operation functions
|
|
||||||
package realtimedata
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"modelRT/constants"
|
|
||||||
"modelRT/diagram"
|
|
||||||
"modelRT/logger"
|
|
||||||
"modelRT/model"
|
|
||||||
"modelRT/network"
|
|
||||||
"modelRT/orm"
|
|
||||||
"modelRT/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// RealTimeDataChan define channel of real time data receive
|
|
||||||
RealTimeDataChan chan network.RealTimeDataReceiveRequest
|
|
||||||
globalComputeState *MeasComputeState
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
|
|
||||||
globalComputeState = NewMeasComputeState()
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartComputingRealTimeDataLimit define func to start compute real time data up or down limit process goroutines by measurement info
|
|
||||||
func StartComputingRealTimeDataLimit(ctx context.Context, measurements []orm.Measurement) {
|
|
||||||
for _, measurement := range measurements {
|
|
||||||
enableValue, exist := measurement.EventPlan["enable"]
|
|
||||||
enable, ok := enableValue.(bool)
|
|
||||||
if !exist || !enable {
|
|
||||||
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
conf, err := initComputeConfig(measurement)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if conf == nil {
|
|
||||||
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
uuidStr := measurement.ComponentUUID.String()
|
|
||||||
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
|
|
||||||
conf.StopGchan = make(chan struct{})
|
|
||||||
globalComputeState.Store(uuidStr, conf)
|
|
||||||
logger.Info(ctx, "starting computing real time data limit for measurement", "measurement_uuid", measurement.ComponentUUID)
|
|
||||||
go continuousComputation(enrichedCtx, conf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
enableValue, exist := measurement.EventPlan["enable"]
|
|
||||||
enable, ok := enableValue.(bool)
|
|
||||||
if !exist {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !enable {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
conf := &ComputeConfig{}
|
|
||||||
|
|
||||||
causeValue, exist := measurement.EventPlan["cause"]
|
|
||||||
if !exist {
|
|
||||||
return nil, errors.New("missing required field cause")
|
|
||||||
}
|
|
||||||
|
|
||||||
cause, ok := causeValue.(map[string]any)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
|
|
||||||
}
|
|
||||||
conf.Cause, err = processCauseMap(cause)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
actionValue, exist := measurement.EventPlan["action"]
|
|
||||||
if !exist {
|
|
||||||
return nil, errors.New("missing required field action")
|
|
||||||
}
|
|
||||||
action, ok := actionValue.(map[string]any)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
|
|
||||||
}
|
|
||||||
conf.Action = action
|
|
||||||
|
|
||||||
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
|
|
||||||
}
|
|
||||||
conf.QueryKey = queryKey
|
|
||||||
conf.DataSize = int64(measurement.Size)
|
|
||||||
// TODO use constant values for temporary settings
|
|
||||||
conf.minBreachCount = constants.MinBreachCount
|
|
||||||
// TODO 后续优化 duration 创建方式
|
|
||||||
conf.Duration = 10
|
|
||||||
|
|
||||||
isFloatCause := false
|
|
||||||
if _, exists := conf.Cause["up"]; exists {
|
|
||||||
isFloatCause = true
|
|
||||||
} else if _, exists := conf.Cause["down"]; exists {
|
|
||||||
isFloatCause = true
|
|
||||||
} else if _, exists := conf.Cause["upup"]; exists {
|
|
||||||
isFloatCause = true
|
|
||||||
} else if _, exists := conf.Cause["downdown"]; exists {
|
|
||||||
isFloatCause = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if isFloatCause {
|
|
||||||
// te config
|
|
||||||
teThresholds, err := parseTEThresholds(conf.Cause)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
|
|
||||||
}
|
|
||||||
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
|
|
||||||
} else {
|
|
||||||
// ti config
|
|
||||||
tiThresholds, err := parseTIThresholds(conf.Cause)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
|
|
||||||
}
|
|
||||||
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func processCauseMap(data map[string]any) (map[string]any, error) {
|
|
||||||
causeResult := make(map[string]any)
|
|
||||||
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
|
||||||
|
|
||||||
var foundFloatKey bool
|
|
||||||
for _, key := range keysToExtract {
|
|
||||||
if value, exists := data[key]; exists {
|
|
||||||
|
|
||||||
foundFloatKey = true
|
|
||||||
|
|
||||||
// check value type
|
|
||||||
if floatVal, ok := value.(float64); ok {
|
|
||||||
causeResult[key] = floatVal
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if foundFloatKey {
|
|
||||||
return causeResult, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
edgeKey := "edge"
|
|
||||||
if value, exists := data[edgeKey]; exists {
|
|
||||||
if stringVal, ok := value.(string); ok {
|
|
||||||
switch stringVal {
|
|
||||||
case "raising":
|
|
||||||
fallthrough
|
|
||||||
case "falling":
|
|
||||||
causeResult[edgeKey] = stringVal
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
|
||||||
client := diagram.NewRedisClient()
|
|
||||||
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
|
|
||||||
duration := util.SecondsToDuration(conf.Duration)
|
|
||||||
ticker := time.NewTicker(duration)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-conf.StopGchan:
|
|
||||||
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
|
|
||||||
return
|
|
||||||
case <-ctx.Done():
|
|
||||||
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
|
||||||
members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize)
|
|
||||||
cancel()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
realTimedatas := util.ConvertZSetMembersToFloat64(members)
|
|
||||||
if len(realTimedatas) == 0 {
|
|
||||||
logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if conf.Analyzer != nil {
|
|
||||||
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
|
||||||
} else {
|
|
||||||
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,32 +0,0 @@
|
||||||
// Package router provides router config
|
|
||||||
package router
|
|
||||||
|
|
||||||
import (
|
|
||||||
"modelRT/handler"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
)
|
|
||||||
|
|
||||||
// registerAsyncTaskRoutes define func of register async task routes
|
|
||||||
func registerAsyncTaskRoutes(rg *gin.RouterGroup, middlewares ...gin.HandlerFunc) {
|
|
||||||
g := rg.Group("/task/")
|
|
||||||
g.Use(middlewares...)
|
|
||||||
|
|
||||||
// Async task creation
|
|
||||||
g.POST("async", handler.AsyncTaskCreateHandler)
|
|
||||||
|
|
||||||
// Async task result query
|
|
||||||
g.GET("async/results", handler.AsyncTaskResultQueryHandler)
|
|
||||||
|
|
||||||
// Async task detail query
|
|
||||||
g.GET("async/:task_id", handler.AsyncTaskResultDetailHandler)
|
|
||||||
|
|
||||||
// Async task cancellation
|
|
||||||
g.POST("async/:task_id/cancel", handler.AsyncTaskCancelHandler)
|
|
||||||
|
|
||||||
// Internal APIs for worker updates (not exposed to external users)
|
|
||||||
internal := g.Group("internal/")
|
|
||||||
internal.Use(middlewares...)
|
|
||||||
internal.POST("async/progress", handler.AsyncTaskProgressUpdateHandler)
|
|
||||||
internal.POST("async/status", handler.AsyncTaskStatusUpdateHandler)
|
|
||||||
}
|
|
||||||
|
|
@ -27,5 +27,4 @@ func RegisterRoutes(engine *gin.Engine, clientToken string) {
|
||||||
registerDataRoutes(routeGroup)
|
registerDataRoutes(routeGroup)
|
||||||
registerMonitorRoutes(routeGroup)
|
registerMonitorRoutes(routeGroup)
|
||||||
registerComponentRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken))
|
registerComponentRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken))
|
||||||
registerAsyncTaskRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken))
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,97 @@
|
||||||
|
package sharememory
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"modelRT/orm"
|
||||||
|
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CreateShareMemory defines a function to create a shared memory
|
||||||
|
func CreateShareMemory(key uintptr, structSize uintptr) (uintptr, error) {
|
||||||
|
// logger := logger.GetLoggerInstance()
|
||||||
|
// create shared memory
|
||||||
|
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, structSize, unix.IPC_CREAT|0o666)
|
||||||
|
if err != 0 {
|
||||||
|
// logger.Error(fmt.Sprintf("create shared memory by key %v failed:", key), zap.Error(err))
|
||||||
|
return 0, fmt.Errorf("create shared memory failed:%w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// attach shared memory
|
||||||
|
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
|
||||||
|
if err != 0 {
|
||||||
|
// logger.Error(fmt.Sprintf("attach shared memory by shmID %v failed:", shmID), zap.Error(err))
|
||||||
|
return 0, fmt.Errorf("attach shared memory failed:%w", err)
|
||||||
|
}
|
||||||
|
return shmAddr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadComponentFromShareMemory defines a function to read component value from shared memory
|
||||||
|
func ReadComponentFromShareMemory(key uintptr, componentInfo *orm.Component) error {
|
||||||
|
structSize := unsafe.Sizeof(orm.Component{})
|
||||||
|
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, uintptr(int(structSize)), 0o666)
|
||||||
|
if err != 0 {
|
||||||
|
return fmt.Errorf("get shared memory failed:%w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
|
||||||
|
if err != 0 {
|
||||||
|
return fmt.Errorf("attach shared memory failed:%w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取共享内存中的数据
|
||||||
|
componentInfo = (*orm.Component)(unsafe.Pointer(shmAddr + structSize))
|
||||||
|
|
||||||
|
// Detach shared memory
|
||||||
|
unix.Syscall(unix.SYS_SHMDT, shmAddr, 0, 0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func WriteComponentInShareMemory(key uintptr, componentInfo *orm.Component) error {
|
||||||
|
structSize := unsafe.Sizeof(orm.Component{})
|
||||||
|
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, uintptr(int(structSize)), 0o666)
|
||||||
|
if err != 0 {
|
||||||
|
return fmt.Errorf("get shared memory failed:%w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
|
||||||
|
if err != 0 {
|
||||||
|
return fmt.Errorf("attach shared memory failed:%w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
obj := (*orm.Component)(unsafe.Pointer(shmAddr + unsafe.Sizeof(structSize)))
|
||||||
|
fmt.Println(obj)
|
||||||
|
|
||||||
|
// id integer NOT NULL DEFAULT nextval('component_id_seq'::regclass),
|
||||||
|
// global_uuid uuid NOT NULL DEFAULT gen_random_uuid(),
|
||||||
|
// nspath character varying(32) COLLATE pg_catalog."default",
|
||||||
|
// tag character varying(32) COLLATE pg_catalog."default" NOT NULL,
|
||||||
|
// name character varying(64) COLLATE pg_catalog."default" NOT NULL,
|
||||||
|
// description character varying(512) COLLATE pg_catalog."default" NOT NULL DEFAULT ''::character varying,
|
||||||
|
// grid character varying(64) COLLATE pg_catalog."default" NOT NULL,
|
||||||
|
// zone character varying(64) COLLATE pg_catalog."default" NOT NULL,
|
||||||
|
// station character varying(64) COLLATE pg_catalog."default" NOT NULL,
|
||||||
|
// type integer NOT NULL,
|
||||||
|
// in_service boolean DEFAULT false,
|
||||||
|
// state integer NOT NULL DEFAULT 0,
|
||||||
|
// connected_bus jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||||
|
// label jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||||
|
// context jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||||
|
// page_id integer NOT NULL,
|
||||||
|
// op integer NOT NULL DEFAULT '-1'::integer,
|
||||||
|
// ts timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
|
||||||
|
unix.Syscall(unix.SYS_SHMDT, shmAddr, 0, 0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteShareMemory defines a function to delete shared memory
|
||||||
|
func DeleteShareMemory(key uintptr) error {
|
||||||
|
_, _, err := unix.Syscall(unix.SYS_SHM_UNLINK, key, 0, 0o666)
|
||||||
|
if err != 0 {
|
||||||
|
return fmt.Errorf("get shared memory failed:%w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -1,77 +0,0 @@
|
||||||
package task
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/gofrs/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DefaultPriority is the default task priority
|
|
||||||
const DefaultPriority = 5
|
|
||||||
|
|
||||||
// HighPriority represents high priority tasks
|
|
||||||
const HighPriority = 10
|
|
||||||
|
|
||||||
// LowPriority represents low priority tasks
|
|
||||||
const LowPriority = 1
|
|
||||||
|
|
||||||
// TaskQueueMessage defines minimal message structure for RabbitMQ/Redis queue dispatch
|
|
||||||
// This struct is designed to be lightweight for efficient message transport
|
|
||||||
type TaskQueueMessage struct {
|
|
||||||
TaskID uuid.UUID `json:"task_id"`
|
|
||||||
TaskType TaskType `json:"task_type"`
|
|
||||||
Priority int `json:"priority,omitempty"` // Optional, defaults to DefaultPriority
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTaskQueueMessage creates a new TaskQueueMessage with default priority
|
|
||||||
func NewTaskQueueMessage(taskID uuid.UUID, taskType TaskType) *TaskQueueMessage {
|
|
||||||
return &TaskQueueMessage{
|
|
||||||
TaskID: taskID,
|
|
||||||
TaskType: taskType,
|
|
||||||
Priority: DefaultPriority,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTaskQueueMessageWithPriority creates a new TaskQueueMessage with specified priority
|
|
||||||
func NewTaskQueueMessageWithPriority(taskID uuid.UUID, taskType TaskType, priority int) *TaskQueueMessage {
|
|
||||||
return &TaskQueueMessage{
|
|
||||||
TaskID: taskID,
|
|
||||||
TaskType: taskType,
|
|
||||||
Priority: priority,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ToJSON converts the TaskQueueMessage to JSON bytes
|
|
||||||
func (m *TaskQueueMessage) ToJSON() ([]byte, error) {
|
|
||||||
return []byte{}, nil // Placeholder - actual implementation would use json.Marshal
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate checks if the TaskQueueMessage is valid
|
|
||||||
func (m *TaskQueueMessage) Validate() bool {
|
|
||||||
// Check if TaskID is valid (not nil UUID)
|
|
||||||
if m.TaskID == uuid.Nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if TaskType is valid
|
|
||||||
switch m.TaskType {
|
|
||||||
case TypeTopologyAnalysis, TypeEventAnalysis, TypeBatchImport:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetPriority sets the priority of the task queue message with validation
|
|
||||||
func (m *TaskQueueMessage) SetPriority(priority int) {
|
|
||||||
if priority < LowPriority {
|
|
||||||
priority = LowPriority
|
|
||||||
}
|
|
||||||
if priority > HighPriority {
|
|
||||||
priority = HighPriority
|
|
||||||
}
|
|
||||||
m.Priority = priority
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPriority returns the priority of the task queue message
|
|
||||||
func (m *TaskQueueMessage) GetPriority() int {
|
|
||||||
return m.Priority
|
|
||||||
}
|
|
||||||
|
|
@ -1,55 +0,0 @@
|
||||||
package task
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type TaskStatus string
|
|
||||||
|
|
||||||
const (
|
|
||||||
StatusPending TaskStatus = "PENDING"
|
|
||||||
StatusRunning TaskStatus = "RUNNING"
|
|
||||||
StatusCompleted TaskStatus = "COMPLETED"
|
|
||||||
StatusFailed TaskStatus = "FAILED"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TaskType 定义异步任务的具体业务类型
|
|
||||||
type TaskType string
|
|
||||||
|
|
||||||
const (
|
|
||||||
TypeTopologyAnalysis TaskType = "TOPOLOGY_ANALYSIS"
|
|
||||||
TypeEventAnalysis TaskType = "EVENT_ANALYSIS"
|
|
||||||
TypeBatchImport TaskType = "BATCH_IMPORT"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Task struct {
|
|
||||||
ID string `bson:"_id" json:"id"`
|
|
||||||
Type TaskType `bson:"type" json:"type"`
|
|
||||||
Status TaskStatus `bson:"status" json:"status"`
|
|
||||||
Priority int `bson:"priority" json:"priority"`
|
|
||||||
|
|
||||||
Params map[string]interface{} `bson:"params" json:"params"`
|
|
||||||
Result map[string]interface{} `bson:"result,omitempty" json:"result"`
|
|
||||||
ErrorMsg string `bson:"error_msg,omitempty" json:"error_msg"`
|
|
||||||
|
|
||||||
CreatedAt time.Time `bson:"created_at" json:"created_at"`
|
|
||||||
StartedAt time.Time `bson:"started_at,omitempty" json:"started_at"`
|
|
||||||
CompletedAt time.Time `bson:"completed_at,omitempty" json:"completed_at"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type TopologyParams struct {
|
|
||||||
CheckIsland bool `json:"check_island"`
|
|
||||||
CheckShort bool `json:"check_short"`
|
|
||||||
BaseModelIDs []string `json:"base_model_ids"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type EventAnalysisParams struct {
|
|
||||||
MotorID string `json:"motor_id"`
|
|
||||||
TriggerID string `json:"trigger_id"`
|
|
||||||
DurationMS int `json:"duration_ms"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type BatchImportParams struct {
|
|
||||||
FileName string `json:"file_name"`
|
|
||||||
FilePath string `json:"file_path"`
|
|
||||||
}
|
|
||||||
|
|
@ -3,7 +3,6 @@ package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -14,36 +13,25 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewRedisClient define func of initialize the Redis client
|
// NewRedisClient define func of initialize the Redis client
|
||||||
func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
|
func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) {
|
||||||
// default options
|
// default options
|
||||||
configs := &clientConfig{
|
options := RedisOptions{
|
||||||
Options: &redis.Options{
|
redisOptions: &redis.Options{
|
||||||
Addr: addr,
|
Addr: addr,
|
||||||
DialTimeout: 5 * time.Second,
|
|
||||||
ReadTimeout: 3 * time.Second,
|
|
||||||
WriteTimeout: 3 * time.Second,
|
|
||||||
PoolSize: 10,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply configuration options from config
|
// Apply configuration options from config
|
||||||
var errs []error
|
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
if err := opt(configs); err != nil {
|
opt(&options)
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(errs) > 0 {
|
|
||||||
return nil, fmt.Errorf("failed to apply options: %w", errors.Join(errs...))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// create redis client
|
// create redis client
|
||||||
client := redis.NewClient(configs.Options)
|
client := redis.NewClient(options.redisOptions)
|
||||||
|
|
||||||
if configs.DialTimeout > 0 {
|
if options.timeout > 0 {
|
||||||
// check if the connection is successful
|
// check if the connection is successful
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), configs.DialTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), options.timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := client.Ping(ctx).Err(); err != nil {
|
if err := client.Ping(ctx).Err(); err != nil {
|
||||||
return nil, fmt.Errorf("can not connect redis:%v", err)
|
return nil, fmt.Errorf("can not connect redis:%v", err)
|
||||||
|
|
@ -55,29 +43,22 @@ func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
|
||||||
// NewRedigoPool define func of initialize the Redigo pool
|
// NewRedigoPool define func of initialize the Redigo pool
|
||||||
func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
|
func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
|
||||||
pool := &redigo.Pool{
|
pool := &redigo.Pool{
|
||||||
// TODO optimize IdleTimeout with config parameter
|
|
||||||
MaxIdle: rCfg.PoolSize / 2,
|
MaxIdle: rCfg.PoolSize / 2,
|
||||||
MaxActive: rCfg.PoolSize,
|
MaxActive: rCfg.PoolSize,
|
||||||
|
// TODO optimize IdleTimeout with config parameter
|
||||||
IdleTimeout: 240 * time.Second,
|
IdleTimeout: 240 * time.Second,
|
||||||
TestOnBorrow: func(c redigo.Conn, t time.Time) error {
|
|
||||||
if time.Since(t) < time.Minute {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
_, err := c.Do("PING")
|
|
||||||
return err
|
|
||||||
},
|
|
||||||
|
|
||||||
|
// Dial function to create the connection
|
||||||
Dial: func() (redigo.Conn, error) {
|
Dial: func() (redigo.Conn, error) {
|
||||||
dialTimeout := time.Duration(rCfg.DialTimeout) * time.Second
|
timeout := time.Duration(rCfg.Timeout) * time.Millisecond // 假设 rCfg.Timeout 是毫秒
|
||||||
readTimeout := time.Duration(rCfg.ReadTimeout) * time.Second
|
|
||||||
writeTimeout := time.Duration(rCfg.WriteTimeout) * time.Second
|
|
||||||
|
|
||||||
opts := []redigo.DialOption{
|
opts := []redigo.DialOption{
|
||||||
redigo.DialDatabase(rCfg.DB),
|
redigo.DialDatabase(rCfg.DB),
|
||||||
redigo.DialPassword(rCfg.Password),
|
redigo.DialPassword(rCfg.Password),
|
||||||
redigo.DialConnectTimeout(dialTimeout),
|
redigo.DialConnectTimeout(timeout),
|
||||||
redigo.DialReadTimeout(readTimeout),
|
// redigo.DialReadTimeout(timeout),
|
||||||
redigo.DialWriteTimeout(writeTimeout),
|
// redigo.DialWriteTimeout(timeout),
|
||||||
|
// TODO add redigo.DialUsername when redis open acl
|
||||||
|
// redis.DialUsername("username"),
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := redigo.Dial("tcp", rCfg.Addr, opts...)
|
c, err := redigo.Dial("tcp", rCfg.Addr, opts...)
|
||||||
|
|
@ -91,14 +72,13 @@ func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
|
||||||
conn := pool.Get()
|
conn := pool.Get()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
if err := conn.Err(); err != nil {
|
if conn.Err() != nil {
|
||||||
return nil, fmt.Errorf("failed to get connection from pool: %w", err)
|
return nil, fmt.Errorf("failed to get connection from pool: %w", conn.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := conn.Do("PING")
|
_, err := conn.Do("PING")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("redis connection test (PING) failed: %w", err)
|
return nil, fmt.Errorf("redis connection test (PING) failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,79 +5,56 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"modelRT/constants"
|
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
type clientConfig struct {
|
type RedisOptions struct {
|
||||||
*redis.Options
|
redisOptions *redis.Options
|
||||||
|
timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*clientConfig) error
|
type RedisOption func(*RedisOptions) error
|
||||||
|
|
||||||
// WithPassword define func of configure redis password options
|
// WithPassword define func of configure redis password options
|
||||||
func WithPassword(password string, env string) Option {
|
func WithPassword(password string) RedisOption {
|
||||||
return func(c *clientConfig) error {
|
return func(o *RedisOptions) error {
|
||||||
if env == constants.ProductionDeployMode && password == "" {
|
if password == "" {
|
||||||
return errors.New("password is empty")
|
return errors.New("password is empty")
|
||||||
}
|
}
|
||||||
c.Password = password
|
o.redisOptions.Password = password
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithConnectTimeout define func of configure redis connect timeout options
|
// WithTimeout define func of configure redis timeout options
|
||||||
func WithConnectTimeout(timeout time.Duration) Option {
|
func WithTimeout(timeout time.Duration) RedisOption {
|
||||||
return func(c *clientConfig) error {
|
return func(o *RedisOptions) error {
|
||||||
if timeout < 0 {
|
if timeout < 0 {
|
||||||
return errors.New("timeout can not be negative")
|
return errors.New("timeout can not be negative")
|
||||||
}
|
}
|
||||||
c.DialTimeout = timeout
|
o.timeout = timeout
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithReadTimeout define func of configure redis read timeout options
|
|
||||||
func WithReadTimeout(timeout time.Duration) Option {
|
|
||||||
return func(c *clientConfig) error {
|
|
||||||
if timeout < 0 {
|
|
||||||
return errors.New("timeout can not be negative")
|
|
||||||
}
|
|
||||||
c.ReadTimeout = timeout
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithWriteTimeout define func of configure redis write timeout options
|
|
||||||
func WithWriteTimeout(timeout time.Duration) Option {
|
|
||||||
return func(c *clientConfig) error {
|
|
||||||
if timeout < 0 {
|
|
||||||
return errors.New("timeout can not be negative")
|
|
||||||
}
|
|
||||||
c.WriteTimeout = timeout
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithDB define func of configure redis db options
|
// WithDB define func of configure redis db options
|
||||||
func WithDB(db int) Option {
|
func WithDB(db int) RedisOption {
|
||||||
return func(c *clientConfig) error {
|
return func(o *RedisOptions) error {
|
||||||
if db < 0 {
|
if db < 0 {
|
||||||
return errors.New("db can not be negative")
|
return errors.New("db can not be negative")
|
||||||
}
|
}
|
||||||
c.DB = db
|
o.redisOptions.DB = db
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithPoolSize define func of configure pool size options
|
// WithPoolSize define func of configure pool size options
|
||||||
func WithPoolSize(poolSize int) Option {
|
func WithPoolSize(poolSize int) RedisOption {
|
||||||
return func(c *clientConfig) error {
|
return func(o *RedisOptions) error {
|
||||||
if poolSize <= 0 {
|
if poolSize <= 0 {
|
||||||
return errors.New("pool size must be greater than 0")
|
return errors.New("pool size must be greater than 0")
|
||||||
}
|
}
|
||||||
c.PoolSize = poolSize
|
o.redisOptions.PoolSize = poolSize
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue