Compare commits

..

No commits in common. "feature/bay-realtime-data-calc" and "develop" have entirely different histories.

60 changed files with 964 additions and 2899 deletions

13
.gitignore vendored
View File

@ -27,16 +27,3 @@ go.work
/log/
# Shield config files in the configs folder
/configs/**/*.yaml
/configs/**/*.pem
# ai config
.cursor/
.claude/
.cursorrules
.copilot/
.chatgpt/
.ai_history/
.vector_cache/
ai-debug.log
*.patch
*.diff

View File

@ -16,12 +16,6 @@ var (
// ErrFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
// ErrSubTargetRepeat define variable to indicates subscription target already exist in list
ErrSubTargetRepeat = newError(40005, "subscription target already exist in list")
// ErrSubTargetNotFound define variable to indicates can not find measurement by subscription target
ErrSubTargetNotFound = newError(40006, "found measuremnet by subscription target failed")
// ErrCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
ErrCancelSubTargetMissing = newError(40007, "cancel a not exist subscription target")
// ErrDBQueryFailed define variable to represents a generic failure during a PostgreSQL SELECT or SCAN operation.
ErrDBQueryFailed = newError(50001, "query postgres database data failed")

View File

@ -1,10 +0,0 @@
// Package common define common error variables
package common
import "errors"
// ErrUnknowEventActionCommand define error of unknown event action command
var ErrUnknowEventActionCommand = errors.New("unknown action command")
// ErrExecEventActionFailed define error of execute event action failed
var ErrExecEventActionFailed = errors.New("exec event action func failed")

View File

@ -44,11 +44,10 @@ var baseCurrentFunc = func(archorValue float64, args ...float64) float64 {
// SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float64, args ...float64) float64, []float64) {
if componentType == constants.DemoType {
switch anchorName {
case "voltage":
if anchorName == "voltage" {
resistance := componentData["resistance"].(float64)
return baseVoltageFunc, []float64{resistance}
case "current":
} else if anchorName == "current" {
resistance := componentData["resistance"].(float64)
return baseCurrentFunc, []float64{resistance}
}

View File

@ -19,21 +19,6 @@ type ServiceConfig struct {
ServiceAddr string `mapstructure:"service_addr"`
ServiceName string `mapstructure:"service_name"`
SecretKey string `mapstructure:"secret_key"`
DeployEnv string `mapstructure:"deploy_env"`
}
// RabbitMQConfig define config struct of RabbitMQ config
type RabbitMQConfig struct {
CACertPath string `mapstructure:"ca_cert_path"`
ClientKeyPath string `mapstructure:"client_key_path"`
ClientKeyPassword string `mapstructure:"client_key_password"`
ClientCertPath string `mapstructure:"client_cert_path"`
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
ServerName string `mapstructure:"server_name"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
}
// KafkaConfig define config struct of kafka config
@ -72,9 +57,7 @@ type RedisConfig struct {
Password string `mapstructure:"password"`
DB int `mapstructure:"db"`
PoolSize int `mapstructure:"poolsize"`
DialTimeout int `mapstructure:"dial_timeout"`
ReadTimeout int `mapstructure:"read_timeout"`
WriteTimeout int `mapstructure:"write_timeout"`
Timeout int `mapstructure:"timeout"`
}
// AntsConfig define config struct of ants pool config
@ -96,7 +79,6 @@ type ModelRTConfig struct {
BaseConfig `mapstructure:"base"`
ServiceConfig `mapstructure:"service"`
PostgresConfig `mapstructure:"postgres"`
RabbitMQConfig `mapstructure:"rabbitmq"`
KafkaConfig `mapstructure:"kafka"`
LoggerConfig `mapstructure:"logger"`
AntsConfig `mapstructure:"ants"`

View File

@ -0,0 +1,17 @@
// Package constants define constant variable
package constants
const (
// CodeSuccess define constant to indicates that the API was successfully processed
CodeSuccess = 20000
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
CodeInvalidParamFailed = 40001
// CodeDBQueryFailed define constant to indicates database query operation failed
CodeDBQueryFailed = 50001
// CodeDBUpdateailed define constant to indicates database update operation failed
CodeDBUpdateailed = 50002
// CodeRedisQueryFailed define constant to indicates redis query operation failed
CodeRedisQueryFailed = 60001
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
CodeRedisUpdateFailed = 60002
)

View File

@ -1,31 +0,0 @@
// Package constants define constant variable
package constants
const (
// CodeSuccess define constant to indicates that the API was successfully processed
CodeSuccess = 20000
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
CodeInvalidParamFailed = 40001
// CodeFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
CodeFoundTargetFailed = 40004
// CodeSubTargetRepeat define variable to indicates subscription target already exist in list
CodeSubTargetRepeat = 40005
// CodeSubTargetNotFound define variable to indicates can not find measurement by subscription target
CodeSubTargetNotFound = 40006
// CodeCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
CodeCancelSubTargetMissing = 40007
// CodeUpdateSubTargetMissing define variable to indicates update a not exist subscription target
CodeUpdateSubTargetMissing = 40008
// CodeAppendSubTargetMissing define variable to indicates append a not exist subscription target
CodeAppendSubTargetMissing = 40009
// CodeUnsupportSubOperation define variable to indicates append a not exist subscription target
CodeUnsupportSubOperation = 40010
// CodeDBQueryFailed define constant to indicates database query operation failed
CodeDBQueryFailed = 50001
// CodeDBUpdateailed define constant to indicates database update operation failed
CodeDBUpdateailed = 50002
// CodeRedisQueryFailed define constant to indicates redis query operation failed
CodeRedisQueryFailed = 60001
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
CodeRedisUpdateFailed = 60002
)

View File

@ -1,11 +0,0 @@
// Package constants define constant variable
package constants
const (
// DevelopmentDeployMode define development operator environment for modelRT project
DevelopmentDeployMode = "development"
// DebugDeployMode define debug operator environment for modelRT project
DebugDeployMode = "debug"
// ProductionDeployMode define production operator environment for modelRT project
ProductionDeployMode = "production"
)

View File

@ -1,5 +1,5 @@
// Package common define common error variables
package common
// Package constants define constant variable
package constants
import "errors"

View File

@ -1,92 +1,31 @@
// Package constants define constant variable
package constants
// EvenvtType define event type
type EvenvtType int
const (
// EventGeneralHard define gereral hard event type
EventGeneralHard EvenvtType = iota
// EventGeneralPlatformSoft define gereral platform soft event type
EventGeneralPlatformSoft
// EventGeneralApplicationSoft define gereral application soft event type
EventGeneralApplicationSoft
// EventWarnHard define warn hard event type
EventWarnHard
// EventWarnPlatformSoft define warn platform soft event type
EventWarnPlatformSoft
// EventWarnApplicationSoft define warn application soft event type
EventWarnApplicationSoft
// EventCriticalHard define critical hard event type
EventCriticalHard
// EventCriticalPlatformSoft define critical platform soft event type
EventCriticalPlatformSoft
// EventCriticalApplicationSoft define critical application soft event type
EventCriticalApplicationSoft
)
// IsGeneral define fucn to check event type is general
func IsGeneral(eventType EvenvtType) bool {
return eventType < 3
}
// IsWarning define fucn to check event type is warn
func IsWarning(eventType EvenvtType) bool {
return eventType >= 3 && eventType <= 5
}
// IsCritical define fucn to check event type is critical
func IsCritical(eventType EvenvtType) bool {
return eventType >= 6
}
const (
// EventFromStation define event from station type
EventFromStation = "station"
// EventFromPlatform define event from platform type
EventFromPlatform = "platform"
// EventFromOthers define event from others type
EventFromOthers = "others"
// TIBreachTriggerType define out of bounds type constant
TIBreachTriggerType = "trigger"
)
const (
// EventStatusHappended define status for event record when event just happened, no data attached yet
EventStatusHappended = iota
// EventStatusDataAttached define status for event record when event just happened, data attached already
EventStatusDataAttached
// EventStatusReported define status for event record when event reported to CIM, no matter it's successful or failed
EventStatusReported
// EventStatusConfirmed define status for event record when event confirmed by CIM, no matter it's successful or failed
EventStatusConfirmed
// EventStatusPersisted define status for event record when event persisted in database, no matter it's successful or failed
EventStatusPersisted
// EventStatusClosed define status for event record when event closed, no matter it's successful or failed
EventStatusClosed
// TelemetryUpLimit define telemetry upper limit
TelemetryUpLimit = "up"
// TelemetryUpUpLimit define telemetry upper upper limit
TelemetryUpUpLimit = "upup"
// TelemetryDownLimit define telemetry limit
TelemetryDownLimit = "down"
// TelemetryDownDownLimit define telemetry lower lower limit
TelemetryDownDownLimit = "downdown"
)
const (
// EventExchangeName define exchange name for event alarm message
EventExchangeName = "event-exchange"
// EventDeadExchangeName define dead letter exchange name for event alarm message
EventDeadExchangeName = "event-dead-letter-exchange"
// TelesignalRaising define telesignal raising edge
TelesignalRaising = "raising"
// TelesignalFalling define telesignal falling edge
TelesignalFalling = "falling"
)
const (
// EventUpDownRoutingKey define routing key for up or down limit event alarm message
EventUpDownRoutingKey = "event.#"
// EventUpDownDeadRoutingKey define dead letter routing key for up or down limit event alarm message
EventUpDownDeadRoutingKey = "event.#"
// EventUpDownQueueName define queue name for up or down limit event alarm message
EventUpDownQueueName = "event-up-down-queue"
// EventUpDownDeadQueueName define dead letter queue name for event alarm message
EventUpDownDeadQueueName = "event-dead-letter-queue"
)
const (
// EventGeneralUpDownLimitCategroy define category for general up and down limit event
EventGeneralUpDownLimitCategroy = "event.general.updown.limit"
// EventWarnUpDownLimitCategroy define category for warn up and down limit event
EventWarnUpDownLimitCategroy = "event.warn.updown.limit"
// EventCriticalUpDownLimitCategroy define category for critical up and down limit event
EventCriticalUpDownLimitCategroy = "event.critical.updown.limit"
// MinBreachCount define min breach count of real time data
MinBreachCount = 10
)

View File

@ -12,6 +12,29 @@ const (
SubUpdateAction string = "update"
)
// 定义状态常量
// TODO 从4位格式修改为5位格式
const (
// SubSuccessCode define subscription success code
SubSuccessCode = "1001"
// SubFailedCode define subscription failed code
SubFailedCode = "1002"
// RTDSuccessCode define real time data return success code
RTDSuccessCode = "1003"
// RTDFailedCode define real time data return failed code
RTDFailedCode = "1004"
// CancelSubSuccessCode define cancel subscription success code
CancelSubSuccessCode = "1005"
// CancelSubFailedCode define cancel subscription failed code
CancelSubFailedCode = "1006"
// SubRepeatCode define subscription repeat code
SubRepeatCode = "1007"
// UpdateSubSuccessCode define update subscription success code
UpdateSubSuccessCode = "1008"
// UpdateSubFailedCode define update subscription failed code
UpdateSubFailedCode = "1009"
)
const (
// SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine
SysCtrlPrefix = "SYS_CTRL_"

View File

@ -1,31 +0,0 @@
// Package constants define constant variable
package constants
const (
// TIBreachTriggerType define out of bounds type constant
TIBreachTriggerType = "trigger"
)
const (
// TelemetryUpLimit define telemetry upper limit
TelemetryUpLimit = "up"
// TelemetryUpUpLimit define telemetry upper upper limit
TelemetryUpUpLimit = "upup"
// TelemetryDownLimit define telemetry limit
TelemetryDownLimit = "down"
// TelemetryDownDownLimit define telemetry lower lower limit
TelemetryDownDownLimit = "downdown"
)
const (
// TelesignalRaising define telesignal raising edge
TelesignalRaising = "raising"
// TelesignalFalling define telesignal falling edge
TelesignalFalling = "falling"
)
const (
// MinBreachCount define min breach count of real time data
MinBreachCount = 10
)

View File

@ -1,321 +0,0 @@
// Package database define database operation functions
package database
import (
"context"
"time"
"modelRT/orm"
"github.com/gofrs/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// CreateAsyncTask creates a new async task in the database
func CreateAsyncTask(ctx context.Context, tx *gorm.DB, taskType orm.AsyncTaskType, params orm.JSONMap) (*orm.AsyncTask, error) {
taskID, err := uuid.NewV4()
if err != nil {
return nil, err
}
task := &orm.AsyncTask{
TaskID: taskID,
TaskType: taskType,
Status: orm.AsyncTaskStatusSubmitted,
Params: params,
CreatedAt: time.Now().Unix(),
}
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).Create(task)
if result.Error != nil {
return nil, result.Error
}
return task, nil
}
// GetAsyncTaskByID retrieves an async task by its ID
func GetAsyncTaskByID(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTask, error) {
var task orm.AsyncTask
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("task_id = ?", taskID).
Clauses(clause.Locking{Strength: "UPDATE"}).
First(&task)
if result.Error != nil {
return nil, result.Error
}
return &task, nil
}
// GetAsyncTasksByIDs retrieves multiple async tasks by their IDs
func GetAsyncTasksByIDs(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTask, error) {
var tasks []orm.AsyncTask
if len(taskIDs) == 0 {
return tasks, nil
}
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("task_id IN ?", taskIDs).
Clauses(clause.Locking{Strength: "UPDATE"}).
Find(&tasks)
if result.Error != nil {
return nil, result.Error
}
return tasks, nil
}
// UpdateAsyncTaskStatus updates the status of an async task
func UpdateAsyncTaskStatus(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, status orm.AsyncTaskStatus) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTask{}).
Where("task_id = ?", taskID).
Update("status", status)
return result.Error
}
// UpdateAsyncTaskProgress updates the progress of an async task
func UpdateAsyncTaskProgress(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, progress int) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTask{}).
Where("task_id = ?", taskID).
Update("progress", progress)
return result.Error
}
// CompleteAsyncTask marks an async task as completed with timestamp
func CompleteAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTask{}).
Where("task_id = ?", taskID).
Updates(map[string]any{
"status": orm.AsyncTaskStatusCompleted,
"finished_at": timestamp,
"progress": 100,
})
return result.Error
}
// FailAsyncTask marks an async task as failed with timestamp
func FailAsyncTask(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, timestamp int64) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTask{}).
Where("task_id = ?", taskID).
Updates(map[string]any{
"status": orm.AsyncTaskStatusFailed,
"finished_at": timestamp,
})
return result.Error
}
// CreateAsyncTaskResult creates a result record for an async task
func CreateAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error {
taskResult := &orm.AsyncTaskResult{
TaskID: taskID,
Result: result,
}
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
resultOp := tx.WithContext(cancelCtx).Create(taskResult)
return resultOp.Error
}
// UpdateAsyncTaskResultWithError updates a task result with error information
func UpdateAsyncTaskResultWithError(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, code int, message string, detail orm.JSONMap) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// Update with error information
result := tx.WithContext(cancelCtx).
Model(&orm.AsyncTaskResult{}).
Where("task_id = ?", taskID).
Updates(map[string]any{
"error_code": code,
"error_message": message,
"error_detail": detail,
"result": nil,
})
return result.Error
}
// UpdateAsyncTaskResultWithSuccess updates a task result with success information
func UpdateAsyncTaskResultWithSuccess(ctx context.Context, tx *gorm.DB, taskID uuid.UUID, result orm.JSONMap) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// First try to update existing record, if not found create new one
existingResult := tx.WithContext(cancelCtx).
Where("task_id = ?", taskID).
FirstOrCreate(&orm.AsyncTaskResult{TaskID: taskID})
if existingResult.Error != nil {
return existingResult.Error
}
// Update with success information
updateResult := tx.WithContext(cancelCtx).
Model(&orm.AsyncTaskResult{}).
Where("task_id = ?", taskID).
Updates(map[string]any{
"result": result,
"error_code": nil,
"error_message": nil,
"error_detail": nil,
})
return updateResult.Error
}
// GetAsyncTaskResult retrieves the result of an async task
func GetAsyncTaskResult(ctx context.Context, tx *gorm.DB, taskID uuid.UUID) (*orm.AsyncTaskResult, error) {
var taskResult orm.AsyncTaskResult
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("task_id = ?", taskID).
First(&taskResult)
if result.Error != nil {
if result.Error == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, result.Error
}
return &taskResult, nil
}
// GetAsyncTaskResults retrieves multiple task results by task IDs
func GetAsyncTaskResults(ctx context.Context, tx *gorm.DB, taskIDs []uuid.UUID) ([]orm.AsyncTaskResult, error) {
var taskResults []orm.AsyncTaskResult
if len(taskIDs) == 0 {
return taskResults, nil
}
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("task_id IN ?", taskIDs).
Find(&taskResults)
if result.Error != nil {
return nil, result.Error
}
return taskResults, nil
}
// GetPendingTasks retrieves pending tasks (submitted but not yet running/completed)
func GetPendingTasks(ctx context.Context, tx *gorm.DB, limit int) ([]orm.AsyncTask, error) {
var tasks []orm.AsyncTask
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("status = ?", orm.AsyncTaskStatusSubmitted).
Order("created_at ASC").
Limit(limit).
Find(&tasks)
if result.Error != nil {
return nil, result.Error
}
return tasks, nil
}
// GetTasksByStatus retrieves tasks by status
func GetTasksByStatus(ctx context.Context, tx *gorm.DB, status orm.AsyncTaskStatus, limit int) ([]orm.AsyncTask, error) {
var tasks []orm.AsyncTask
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
result := tx.WithContext(cancelCtx).
Where("status = ?", status).
Order("created_at ASC").
Limit(limit).
Find(&tasks)
if result.Error != nil {
return nil, result.Error
}
return tasks, nil
}
// DeleteOldTasks deletes tasks older than the specified timestamp
func DeleteOldTasks(ctx context.Context, tx *gorm.DB, olderThan int64) error {
// ctx timeout judgment
cancelCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// First delete task results
result := tx.WithContext(cancelCtx).
Where("task_id IN (SELECT task_id FROM async_task WHERE created_at < ?)", olderThan).
Delete(&orm.AsyncTaskResult{})
if result.Error != nil {
return result.Error
}
// Then delete tasks
result = tx.WithContext(cancelCtx).
Where("created_at < ?", olderThan).
Delete(&orm.AsyncTask{})
return result.Error
}

View File

@ -53,8 +53,7 @@ func FillingLongTokenModel(ctx context.Context, tx *gorm.DB, identModel *model.L
func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken string) (model.IndentityTokenModelInterface, error) {
identSlice := strings.Split(identToken, ".")
identSliceLen := len(identSlice)
switch identSliceLen {
case 4:
if identSliceLen == 4 {
// token1.token2.token3.token4.token7
shortIndentModel := &model.ShortIdentityTokenModel{
GridTag: identSlice[0],
@ -68,7 +67,7 @@ func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken strin
return nil, err
}
return shortIndentModel, nil
case 7:
} else if identSliceLen == 7 {
// token1.token2.token3.token4.token5.token6.token7
longIndentModel := &model.LongIdentityTokenModel{
GridTag: identSlice[0],

View File

@ -19,8 +19,7 @@ func ParseAttrToken(ctx context.Context, tx *gorm.DB, attrToken, clientToken str
attrSlice := strings.Split(attrToken, ".")
attrLen := len(attrSlice)
switch attrLen {
case 4:
if attrLen == 4 {
short := &model.ShortAttrInfo{
AttrGroupName: attrSlice[2],
AttrKey: attrSlice[3],
@ -36,7 +35,7 @@ func ParseAttrToken(ctx context.Context, tx *gorm.DB, attrToken, clientToken str
}
short.AttrValue = attrValue
return short, nil
case 7:
} else if attrLen == 7 {
long := &model.LongAttrInfo{
AttrGroupName: attrSlice[5],
AttrKey: attrSlice[6],

View File

@ -2,10 +2,11 @@
package database
import (
"context"
"sync"
"time"
"modelRT/logger"
"modelRT/orm"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@ -14,36 +15,32 @@ import (
var (
postgresOnce sync.Once
_globalPostgresClient *gorm.DB
_globalPostgresMu sync.RWMutex
)
// GetPostgresDBClient returns the global PostgresDB client.It's safe for concurrent use.
func GetPostgresDBClient() *gorm.DB {
return _globalPostgresClient
_globalPostgresMu.RLock()
client := _globalPostgresClient
_globalPostgresMu.RUnlock()
return client
}
// InitPostgresDBInstance return instance of PostgresDB client
func InitPostgresDBInstance(PostgresDBURI string) *gorm.DB {
func InitPostgresDBInstance(ctx context.Context, PostgresDBURI string) *gorm.DB {
postgresOnce.Do(func() {
_globalPostgresClient = initPostgresDBClient(PostgresDBURI)
_globalPostgresClient = initPostgresDBClient(ctx, PostgresDBURI)
})
return _globalPostgresClient
}
// initPostgresDBClient return successfully initialized PostgresDB client
func initPostgresDBClient(PostgresDBURI string) *gorm.DB {
func initPostgresDBClient(ctx context.Context, PostgresDBURI string) *gorm.DB {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
db, err := gorm.Open(postgres.Open(PostgresDBURI), &gorm.Config{Logger: logger.NewGormLogger()})
if err != nil {
panic(err)
}
// Auto migrate async task tables
err = db.AutoMigrate(
&orm.AsyncTask{},
&orm.AsyncTaskResult{},
)
if err != nil {
panic(err)
}
return db
}

View File

@ -19,8 +19,8 @@ func NewRedisClient() *RedisClient {
}
}
// QueryByZRange define func to query real time data from redis zset
func (rc *RedisClient) QueryByZRange(ctx context.Context, key string, size int64) ([]redis.Z, error) {
// QueryByZRangeByLex define func to query real time data from redis zset
func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64) ([]redis.Z, error) {
client := rc.Client
args := redis.ZRangeArgs{
Key: key,

View File

@ -16,15 +16,13 @@ var (
)
// initClient define func of return successfully initialized redis client
func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
func initClient(rCfg config.RedisConfig) *redis.Client {
client, err := util.NewRedisClient(
rCfg.Addr,
util.WithPassword(rCfg.Password, deployEnv),
util.WithPassword(rCfg.Password),
util.WithDB(rCfg.DB),
util.WithPoolSize(rCfg.PoolSize),
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
)
if err != nil {
panic(err)
@ -33,9 +31,9 @@ func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
}
// InitRedisClientInstance define func of return instance of redis client
func InitRedisClientInstance(rCfg config.RedisConfig, deployEnv string) *redis.Client {
func InitRedisClientInstance(rCfg config.RedisConfig) *redis.Client {
once.Do(func() {
_globalStorageClient = initClient(rCfg, deployEnv)
_globalStorageClient = initClient(rCfg)
})
return _globalStorageClient
}

View File

@ -16,15 +16,13 @@ var (
)
// initClient define func of return successfully initialized redis client
func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
func initClient(rCfg config.RedisConfig) *redis.Client {
client, err := util.NewRedisClient(
rCfg.Addr,
util.WithPassword(rCfg.Password, deployEnv),
util.WithPassword(rCfg.Password),
util.WithDB(rCfg.DB),
util.WithPoolSize(rCfg.PoolSize),
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
)
if err != nil {
panic(err)
@ -33,9 +31,9 @@ func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
}
// InitClientInstance define func of return instance of redis client
func InitClientInstance(rCfg config.RedisConfig, deployEnv string) *redis.Client {
func InitClientInstance(rCfg config.RedisConfig) *redis.Client {
once.Do(func() {
_globalLockerClient = initClient(rCfg, deployEnv)
_globalLockerClient = initClient(rCfg)
})
return _globalLockerClient
}

4
go.mod
View File

@ -13,15 +13,14 @@ require (
github.com/json-iterator/go v1.1.12
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/panjf2000/ants/v2 v2.10.0
github.com/rabbitmq/amqp091-go v1.10.0
github.com/redis/go-redis/v9 v9.7.3
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.4
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
go.uber.org/zap v1.27.0
golang.org/x/sys v0.28.0
gorm.io/driver/mysql v1.5.7
gorm.io/driver/postgres v1.5.9
gorm.io/gorm v1.25.12
@ -82,7 +81,6 @@ require (
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.28.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect

4
go.sum
View File

@ -121,8 +121,6 @@ github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xl
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
@ -164,8 +162,6 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -5,10 +5,10 @@ import (
"net/http"
"strconv"
"modelRT/alert"
"modelRT/constants"
"modelRT/logger"
"modelRT/network"
"modelRT/real-time-data/alert"
"github.com/gin-gonic/gin"
)

View File

@ -1,677 +0,0 @@
// Package handler provides HTTP handlers for various endpoints.
package handler
import (
"net/http"
"strings"
"time"
"modelRT/database"
"modelRT/logger"
"modelRT/network"
"modelRT/orm"
_ "modelRT/task"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"gorm.io/gorm"
)
// AsyncTaskCreateHandler handles creation of asynchronous tasks
// @Summary 创建异步任务
// @Description 创建新的异步任务并返回任务ID任务将被提交到队列等待处理
// @Tags AsyncTask
// @Accept json
// @Produce json
// @Param request body network.AsyncTaskCreateRequest true "任务创建请求"
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskCreateResponse} "任务创建成功"
// @Failure 400 {object} network.FailureResponse "请求参数错误"
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
// @Router /task/async [post]
func AsyncTaskCreateHandler(c *gin.Context) {
ctx := c.Request.Context()
var request network.AsyncTaskCreateRequest
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(ctx, "failed to unmarshal async task create request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid request parameters",
})
return
}
// Validate task type
if !orm.IsValidAsyncTaskType(request.TaskType) {
logger.Error(ctx, "invalid task type", "task_type", request.TaskType)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task type",
})
return
}
// Validate task parameters based on task type
if !validateTaskParams(request.TaskType, request.Params) {
logger.Error(ctx, "invalid task parameters", "task_type", request.TaskType, "params", request.Params)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task parameters",
})
return
}
// Get database connection from context or use default
db := getDBFromContext(c)
if db == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
return
}
// Create task in database
taskType := orm.AsyncTaskType(request.TaskType)
params := orm.JSONMap(request.Params)
asyncTask, err := database.CreateAsyncTask(ctx, db, taskType, params)
if err != nil {
logger.Error(ctx, "failed to create async task in database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to create task",
})
return
}
// Create task queue message
// taskQueueMsg := task.NewTaskQueueMessage(asyncTask.TaskID, task.TaskType(request.TaskType))
// TODO: Send task to message queue (RabbitMQ/Redis)
// This should be implemented when message queue integration is ready
// For now, we'll just log the task creation
logger.Info(ctx, "async task created successfully", "task_id", asyncTask.TaskID, "task_type", request.TaskType)
// Return success response
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "task created successfully",
Payload: network.AsyncTaskCreateResponse{
TaskID: asyncTask.TaskID,
},
})
}
// AsyncTaskResultQueryHandler handles querying of asynchronous task results
// @Summary 查询异步任务结果
// @Description 根据任务ID列表查询异步任务的状态和结果
// @Tags AsyncTask
// @Accept json
// @Produce json
// @Param task_ids query string true "任务ID列表用逗号分隔"
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskResultQueryResponse} "查询成功"
// @Failure 400 {object} network.FailureResponse "请求参数错误"
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
// @Router /task/async/results [get]
func AsyncTaskResultQueryHandler(c *gin.Context) {
ctx := c.Request.Context()
// Parse task IDs from query parameter
taskIDsParam := c.Query("task_ids")
if taskIDsParam == "" {
logger.Error(ctx, "task_ids parameter is required")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "task_ids parameter is required",
})
return
}
// Parse comma-separated task IDs
var taskIDs []uuid.UUID
taskIDStrs := splitCommaSeparated(taskIDsParam)
for _, taskIDStr := range taskIDStrs {
taskID, err := uuid.FromString(taskIDStr)
if err != nil {
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task ID format",
})
return
}
taskIDs = append(taskIDs, taskID)
}
if len(taskIDs) == 0 {
logger.Error(ctx, "no valid task IDs provided")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "no valid task IDs provided",
})
return
}
// Get database connection from context or use default
db := getDBFromContext(c)
if db == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
return
}
// Query tasks from database
asyncTasks, err := database.GetAsyncTasksByIDs(ctx, db, taskIDs)
if err != nil {
logger.Error(ctx, "failed to query async tasks from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query tasks",
})
return
}
// Query task results from database
taskResults, err := database.GetAsyncTaskResults(ctx, db, taskIDs)
if err != nil {
logger.Error(ctx, "failed to query async task results from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query task results",
})
return
}
// Create a map of task results for easy lookup
taskResultMap := make(map[uuid.UUID]orm.AsyncTaskResult)
for _, result := range taskResults {
taskResultMap[result.TaskID] = result
}
// Convert to response format
var responseTasks []network.AsyncTaskResult
for _, asyncTask := range asyncTasks {
taskResult := network.AsyncTaskResult{
TaskID: asyncTask.TaskID,
TaskType: string(asyncTask.TaskType),
Status: string(asyncTask.Status),
CreatedAt: asyncTask.CreatedAt,
FinishedAt: asyncTask.FinishedAt,
Progress: asyncTask.Progress,
}
// Add result or error information if available
if result, exists := taskResultMap[asyncTask.TaskID]; exists {
if result.Result != nil {
taskResult.Result = map[string]any(result.Result)
}
if result.ErrorCode != nil {
taskResult.ErrorCode = result.ErrorCode
}
if result.ErrorMessage != nil {
taskResult.ErrorMessage = result.ErrorMessage
}
if result.ErrorDetail != nil {
taskResult.ErrorDetail = map[string]any(result.ErrorDetail)
}
}
responseTasks = append(responseTasks, taskResult)
}
// Return success response
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "query completed",
Payload: network.AsyncTaskResultQueryResponse{
Total: len(responseTasks),
Tasks: responseTasks,
},
})
}
// AsyncTaskProgressUpdateHandler handles updating task progress (internal use, not exposed via API)
func AsyncTaskProgressUpdateHandler(c *gin.Context) {
ctx := c.Request.Context()
var request network.AsyncTaskProgressUpdate
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(ctx, "failed to unmarshal async task progress update request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid request parameters",
})
return
}
// Get database connection from context or use default
db := getDBFromContext(c)
if db == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
return
}
// Update task progress
err := database.UpdateAsyncTaskProgress(ctx, db, request.TaskID, request.Progress)
if err != nil {
logger.Error(ctx, "failed to update async task progress", "task_id", request.TaskID, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to update task progress",
})
return
}
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "task progress updated successfully",
Payload: nil,
})
}
// AsyncTaskStatusUpdateHandler handles updating task status (internal use, not exposed via API)
func AsyncTaskStatusUpdateHandler(c *gin.Context) {
ctx := c.Request.Context()
var request network.AsyncTaskStatusUpdate
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(ctx, "failed to unmarshal async task status update request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid request parameters",
})
return
}
// Validate status
validStatus := map[string]bool{
string(orm.AsyncTaskStatusSubmitted): true,
string(orm.AsyncTaskStatusRunning): true,
string(orm.AsyncTaskStatusCompleted): true,
string(orm.AsyncTaskStatusFailed): true,
}
if !validStatus[request.Status] {
logger.Error(ctx, "invalid task status", "status", request.Status)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task status",
})
return
}
// Get database connection from context or use default
db := getDBFromContext(c)
if db == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
return
}
// Update task status
status := orm.AsyncTaskStatus(request.Status)
err := database.UpdateAsyncTaskStatus(ctx, db, request.TaskID, status)
if err != nil {
logger.Error(ctx, "failed to update async task status", "task_id", request.TaskID, "status", request.Status, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to update task status",
})
return
}
// If task is completed or failed, update finished_at timestamp
if request.Status == string(orm.AsyncTaskStatusCompleted) {
err = database.CompleteAsyncTask(ctx, db, request.TaskID, request.Timestamp)
} else if request.Status == string(orm.AsyncTaskStatusFailed) {
err = database.FailAsyncTask(ctx, db, request.TaskID, request.Timestamp)
}
if err != nil {
logger.Error(ctx, "failed to update async task completion timestamp", "task_id", request.TaskID, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to update task completion timestamp",
})
return
}
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "task status updated successfully",
Payload: nil,
})
}
// Helper functions
func validateTaskParams(taskType string, params map[string]any) bool {
switch taskType {
case string(orm.AsyncTaskTypeTopologyAnalysis):
return validateTopologyAnalysisParams(params)
case string(orm.AsyncTaskTypePerformanceAnalysis):
return validatePerformanceAnalysisParams(params)
case string(orm.AsyncTaskTypeEventAnalysis):
return validateEventAnalysisParams(params)
case string(orm.AsyncTaskTypeBatchImport):
return validateBatchImportParams(params)
default:
return false
}
}
func validateTopologyAnalysisParams(params map[string]any) bool {
// Check required parameters for topology analysis
if startUUID, ok := params["start_uuid"]; !ok || startUUID == "" {
return false
}
if endUUID, ok := params["end_uuid"]; !ok || endUUID == "" {
return false
}
return true
}
func validatePerformanceAnalysisParams(params map[string]any) bool {
// Check required parameters for performance analysis
if componentIDs, ok := params["component_ids"]; !ok {
return false
} else if ids, isSlice := componentIDs.([]interface{}); !isSlice || len(ids) == 0 {
return false
}
return true
}
func validateEventAnalysisParams(params map[string]any) bool {
// Check required parameters for event analysis
if eventType, ok := params["event_type"]; !ok || eventType == "" {
return false
}
return true
}
func validateBatchImportParams(params map[string]any) bool {
// Check required parameters for batch import
if filePath, ok := params["file_path"]; !ok || filePath == "" {
return false
}
return true
}
func splitCommaSeparated(s string) []string {
var result []string
var current strings.Builder
inQuotes := false
escape := false
for _, ch := range s {
if escape {
current.WriteRune(ch)
escape = false
continue
}
switch ch {
case '\\':
escape = true
case '"':
inQuotes = !inQuotes
case ',':
if !inQuotes {
result = append(result, strings.TrimSpace(current.String()))
current.Reset()
} else {
current.WriteRune(ch)
}
default:
current.WriteRune(ch)
}
}
if current.Len() > 0 {
result = append(result, strings.TrimSpace(current.String()))
}
return result
}
func getDBFromContext(c *gin.Context) *gorm.DB {
// Try to get database connection from context
// This should be set by middleware
if db, exists := c.Get("db"); exists {
if gormDB, ok := db.(*gorm.DB); ok {
return gormDB
}
}
// Fallback to global database connection
// This should be implemented based on your application's database setup
// For now, return nil - actual implementation should retrieve from application context
return nil
}
// AsyncTaskResultDetailHandler handles detailed query of a single async task result
// @Summary 查询异步任务详情
// @Description 根据任务ID查询异步任务的详细状态和结果
// @Tags AsyncTask
// @Accept json
// @Produce json
// @Param task_id path string true "任务ID"
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskResult} "查询成功"
// @Failure 400 {object} network.FailureResponse "请求参数错误"
// @Failure 404 {object} network.FailureResponse "任务不存在"
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
// @Router /task/async/{task_id} [get]
func AsyncTaskResultDetailHandler(c *gin.Context) {
ctx := c.Request.Context()
// Parse task ID from path parameter
taskIDStr := c.Param("task_id")
if taskIDStr == "" {
logger.Error(ctx, "task_id parameter is required")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "task_id parameter is required",
})
return
}
taskID, err := uuid.FromString(taskIDStr)
if err != nil {
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task ID format",
})
return
}
// Get database connection from context or use default
db := getDBFromContext(c)
if db == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
return
}
// Query task from database
asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID)
if err != nil {
if err == gorm.ErrRecordNotFound {
logger.Error(ctx, "async task not found", "task_id", taskID)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusNotFound,
Msg: "task not found",
})
return
}
logger.Error(ctx, "failed to query async task from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query task",
})
return
}
// Query task result from database
taskResult, err := database.GetAsyncTaskResult(ctx, db, taskID)
if err != nil {
logger.Error(ctx, "failed to query async task result from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query task result",
})
return
}
// Convert to response format
responseTask := network.AsyncTaskResult{
TaskID: asyncTask.TaskID,
TaskType: string(asyncTask.TaskType),
Status: string(asyncTask.Status),
CreatedAt: asyncTask.CreatedAt,
FinishedAt: asyncTask.FinishedAt,
Progress: asyncTask.Progress,
}
// Add result or error information if available
if taskResult != nil {
if taskResult.Result != nil {
responseTask.Result = map[string]any(taskResult.Result)
}
if taskResult.ErrorCode != nil {
responseTask.ErrorCode = taskResult.ErrorCode
}
if taskResult.ErrorMessage != nil {
responseTask.ErrorMessage = taskResult.ErrorMessage
}
if taskResult.ErrorDetail != nil {
responseTask.ErrorDetail = map[string]any(taskResult.ErrorDetail)
}
}
// Return success response
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "query completed",
Payload: responseTask,
})
}
// AsyncTaskCancelHandler handles cancellation of an async task
// @Summary 取消异步任务
// @Description 取消指定ID的异步任务如果任务尚未开始执行
// @Tags AsyncTask
// @Accept json
// @Produce json
// @Param task_id path string true "任务ID"
// @Success 200 {object} network.SuccessResponse "任务取消成功"
// @Failure 400 {object} network.FailureResponse "请求参数错误或任务无法取消"
// @Failure 404 {object} network.FailureResponse "任务不存在"
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
// @Router /task/async/{task_id}/cancel [post]
func AsyncTaskCancelHandler(c *gin.Context) {
ctx := c.Request.Context()
// Parse task ID from path parameter
taskIDStr := c.Param("task_id")
if taskIDStr == "" {
logger.Error(ctx, "task_id parameter is required")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "task_id parameter is required",
})
return
}
taskID, err := uuid.FromString(taskIDStr)
if err != nil {
logger.Error(ctx, "invalid task ID format", "task_id", taskIDStr, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task ID format",
})
return
}
// Get database connection from context or use default
db := getDBFromContext(c)
if db == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
return
}
// Query task from database
asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID)
if err != nil {
if err == gorm.ErrRecordNotFound {
logger.Error(ctx, "async task not found", "task_id", taskID)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusNotFound,
Msg: "task not found",
})
return
}
logger.Error(ctx, "failed to query async task from database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to query task",
})
return
}
// Check if task can be cancelled (only SUBMITTED tasks can be cancelled)
if asyncTask.Status != orm.AsyncTaskStatusSubmitted {
logger.Error(ctx, "task cannot be cancelled", "task_id", taskID, "status", asyncTask.Status)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "task cannot be cancelled (already running or completed)",
})
return
}
// Update task status to failed with cancellation reason
timestamp := time.Now().Unix()
err = database.FailAsyncTask(ctx, db, taskID, timestamp)
if err != nil {
logger.Error(ctx, "failed to cancel async task", "task_id", taskID, "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to cancel task",
})
return
}
// Update task result with cancellation error
err = database.UpdateAsyncTaskResultWithError(ctx, db, taskID, 40003, "task cancelled by user", orm.JSONMap{
"cancelled_at": timestamp,
"cancelled_by": "user",
})
if err != nil {
logger.Error(ctx, "failed to update task result with cancellation error", "task_id", taskID, "error", err)
// Continue anyway since task is already marked as failed
}
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "task cancelled successfully",
})
}

View File

@ -3,7 +3,7 @@ package handler
import (
"net/http"
"modelRT/common"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/network"
@ -16,7 +16,7 @@ func AttrDeleteHandler(c *gin.Context) {
var request network.AttrDeleteRequest
clientToken := c.GetString("client_token")
if clientToken == "" {
err := common.ErrGetClientToken
err := constants.ErrGetClientToken
logger.Error(c, "failed to get client token from context", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{

View File

@ -3,7 +3,7 @@ package handler
import (
"net/http"
"modelRT/common"
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"modelRT/network"
@ -17,7 +17,7 @@ func AttrGetHandler(c *gin.Context) {
clientToken := c.GetString("client_token")
if clientToken == "" {
err := common.ErrGetClientToken
err := constants.ErrGetClientToken
logger.Error(c, "failed to get client token from context", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{

View File

@ -3,7 +3,7 @@ package handler
import (
"net/http"
"modelRT/common"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/network"
@ -17,7 +17,7 @@ func AttrSetHandler(c *gin.Context) {
clientToken := c.GetString("client_token")
if clientToken == "" {
err := common.ErrGetClientToken
err := constants.ErrGetClientToken
logger.Error(c, "failed to get client token from context", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{

View File

@ -7,7 +7,6 @@ import (
"fmt"
"net/http"
"modelRT/common"
"modelRT/constants"
"modelRT/database"
"modelRT/diagram"
@ -44,7 +43,7 @@ func DiagramNodeLinkHandler(c *gin.Context) {
var request network.DiagramNodeLinkRequest
clientToken := c.GetString("client_token")
if clientToken == "" {
err := common.ErrGetClientToken
err := constants.ErrGetClientToken
logger.Error(c, "failed to get client token from context", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
@ -168,7 +167,7 @@ func processLinkSetData(ctx context.Context, action string, level int, prevLinkS
err2 = prevLinkSet.SREM(prevMember)
}
default:
err := common.ErrUnsupportedLinkAction
err := constants.ErrUnsupportedLinkAction
logger.Error(ctx, "unsupport diagram node link process action", "action", action, "error", err)
return err
}

View File

@ -30,14 +30,3 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) {
}
c.JSON(http.StatusOK, resp)
}
func renderWSRespFailure(c *gin.Context, code int, msg string, payload any) {
resp := network.WSResponse{
Code: code,
Msg: msg,
}
if payload != nil {
resp.Payload = payload
}
c.JSON(http.StatusOK, resp)
}

View File

@ -6,10 +6,10 @@ import (
"net/http"
"strconv"
"modelRT/alert"
"modelRT/constants"
"modelRT/logger"
"modelRT/network"
"modelRT/real-time-data/alert"
"github.com/gin-gonic/gin"
)

View File

@ -4,7 +4,7 @@ package handler
import (
"net/http"
"modelRT/common"
"modelRT/constants"
"modelRT/database"
"modelRT/diagram"
"modelRT/logger"
@ -19,7 +19,7 @@ func MeasurementGetHandler(c *gin.Context) {
clientToken := c.GetString("client_token")
if clientToken == "" {
err := common.ErrGetClientToken
err := constants.ErrGetClientToken
logger.Error(c, "failed to get client token from context", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{

View File

@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"modelRT/common"
"modelRT/constants"
"modelRT/database"
"modelRT/diagram"
@ -21,7 +20,7 @@ func MeasurementLinkHandler(c *gin.Context) {
var request network.MeasurementLinkRequest
clientToken := c.GetString("client_token")
if clientToken == "" {
err := common.ErrGetClientToken
err := constants.ErrGetClientToken
logger.Error(c, "failed to get client token from context", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
@ -94,7 +93,7 @@ func MeasurementLinkHandler(c *gin.Context) {
logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
}
default:
err = common.ErrUnsupportedLinkAction
err = constants.ErrUnsupportedLinkAction
logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err)
}

View File

@ -39,14 +39,20 @@ func PullRealTimeDataHandler(c *gin.Context) {
if clientID == "" {
err := fmt.Errorf("clientID is missing from the path")
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
renderWSRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error(c, "upgrade http protocol to websocket protocol failed", "error", err)
renderWSRespFailure(c, constants.RespCodeServerError, err.Error(), nil)
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
defer conn.Close()
@ -54,18 +60,9 @@ func PullRealTimeDataHandler(c *gin.Context) {
ctx, cancel := context.WithCancel(c.Request.Context())
defer cancel()
conn.SetCloseHandler(func(code int, text string) error {
logger.Info(c.Request.Context(), "websocket processor shutdown trigger",
"clientID", clientID, "code", code, "reason", text)
// call cancel to notify other goroutines to stop working
cancel()
return nil
})
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
sendChan := make(chan network.WSResponse, constants.SendChanBufferSize)
sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize)
go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan)
go readClientMessages(ctx, conn, clientID, cancel)
@ -82,33 +79,52 @@ func PullRealTimeDataHandler(c *gin.Context) {
select {
case targetData, ok := <-fanInChan:
if !ok {
sendChan <- network.WSResponse{
Code: constants.RespCodeServerError,
Msg: "abnormal shutdown of data fan-in channel",
}
logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID)
return
}
buffer = append(buffer, targetData)
if len(buffer) >= bufferMaxSize {
flushBuffer(ctx, &buffer, sendChan, clientID, "buffer_full")
// buffer is full, send immediately
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID)
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
// reset the ticker to prevent it from triggering immediately after the ticker is sent
ticker.Reset(sendMaxInterval)
}
case <-ticker.C:
if len(buffer) > 0 {
flushBuffer(ctx, &buffer, sendChan, clientID, "ticker_timeout")
// when the ticker is triggered, all data in the send buffer is sent
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID)
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
}
case <-ctx.Done():
// last refresh before exiting
// send the last remaining data
if len(buffer) > 0 {
flushBuffer(ctx, &buffer, sendChan, clientID, "shutdown")
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID)
}
}
logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
return
}
}
}
// readClientMessages define func to responsible for continuously listening for messages sent by clients (such as Ping/Pong, Close Frame, or control commands)
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
// conn.SetReadLimit(512)
for {
@ -133,47 +149,54 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri
}
}
func flushBuffer(ctx context.Context, buffer *[]network.RealTimePullTarget, sendChan chan<- network.WSResponse, clientID string, reason string) {
if len(*buffer) == 0 {
return
// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client
func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error {
if len(targetsData) == 0 {
return nil
}
resp := network.WSResponse{
Code: constants.RespCodeSuccess,
Msg: "process completed",
response := network.SuccessResponse{
Code: 200,
Msg: "success",
Payload: network.RealTimePullPayload{
Targets: *buffer,
Targets: targetsData,
},
}
select {
case sendChan <- resp:
default:
logger.Warn(ctx, "sendChan blocked, dropping data batch", "client_id", clientID, "reason", reason)
}
*buffer = make([]network.RealTimePullTarget, 0, constants.SendMaxBatchSize)
return conn.WriteJSON(response)
}
// sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan network.WSResponse, cancel context.CancelFunc) {
defer func() {
if r := recover(); r != nil {
logger.Error(ctx, "sendDataStream recovered from panic", "err", r)
}
}()
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
for resp := range sendChan {
if err := conn.WriteJSON(resp); err != nil {
logger.Error(ctx, "websocket write failed", "client_id", clientID, "error", err)
for targetsData := range sendChan {
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved {
err := conn.WriteJSON(map[string]any{
"code": 2101,
"msg": "all targets removed in given client_id",
"payload": map[string]int{
"active_targets_count": 0,
},
})
if err != nil {
logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err)
cancel()
}
continue
}
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil {
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err)
cancel()
return
}
}
logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID)
}
// processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- network.WSResponse) {
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) {
// ensure the fanInChan will not leak
defer close(fanInChan)
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
stopChanMap := make(map[string]chan struct{})
s.globalMutex.RLock()
@ -360,7 +383,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
}
// removeTargets define func to stops running polling goroutines for targets that were removed
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- network.WSResponse) {
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) {
for _, target := range removeTargets {
stopChan, exists := stopChanMap[target]
if !exists {
@ -379,18 +402,17 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re
}
}
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- network.WSResponse) {
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
resp := network.WSResponse{
Code: constants.RespCodeSuccessWithNoSub,
Msg: "all targets removed",
Payload: map[string]int{"active_targets_count": 0},
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) {
specialTarget := network.RealTimePullTarget{
ID: constants.SysCtrlAllRemoved,
Datas: []network.RealTimePullData{},
}
select {
case sendChan <- resp:
case sendChan <- []network.RealTimePullTarget{specialTarget}:
logger.Info(ctx, "sent 2101 status request to sendChan")
default:
logger.Warn(ctx, "sendChan is full, skipping 2101 status")
logger.Warn(ctx, "sendChan is full, skipping 2101 status message")
}
}
@ -401,6 +423,7 @@ func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) {
close(stopChan)
}
clear(stopChanMap)
return
}
// redisPollingConfig define struct for param which query real time data from redis
@ -440,7 +463,7 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig,
}
func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) {
members, err := client.QueryByZRange(ctx, config.queryKey, config.dataSize)
members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize)
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
return

View File

@ -168,6 +168,7 @@ func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, tran
}
transportChannel <- subPoss
}
return
}
// messageTypeToString define func of auxiliary to convert message type to string

View File

@ -5,9 +5,9 @@ import (
"context"
"fmt"
"maps"
"net/http"
"sync"
"modelRT/common"
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
@ -33,42 +33,42 @@ func init() {
// @Accept json
// @Produce json
// @Param request body network.RealTimeSubRequest true "量测节点实时数据订阅"
// @Success 2000 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
//
// @Example 2000 {
// "code": 2000,
// "msg": "process completed",
// @Example 200 {
// "code": 200,
// "msg": "success",
// "payload": {
// "targets": [
// {
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_C_rms",
// "code": "20000",
// "code": "1001",
// "msg": "subscription success"
// },
// {
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
// "code": "20000",
// "code": "1002",
// "msg": "subscription failed"
// }
// ]
// }
// }
//
// @Failure 3000 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
//
// @Example 3000 {
// "code": 3000,
// "msg": "process completed with partial failures",
// @Example 400 {
// "code": 400,
// "msg": "failed to get recommend data from redis",
// "payload": {
// "targets": [
// {
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_A_rms",
// "code": "40005",
// "code": "1002",
// "msg": "subscription failed"
// },
// {
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
// "code": "50001",
// "code": "1002",
// "msg": "subscription failed"
// }
// ]
@ -83,7 +83,10 @@ func RealTimeSubHandler(c *gin.Context) {
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(c, "failed to unmarshal real time query request", "error", err)
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
@ -92,7 +95,10 @@ func RealTimeSubHandler(c *gin.Context) {
id, err := uuid.NewV4()
if err != nil {
logger.Error(c, "failed to generate client id", "error", err)
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
clientID = id.String()
@ -117,74 +123,110 @@ func RealTimeSubHandler(c *gin.Context) {
results, err := globalSubState.CreateConfig(c, tx, clientID, request.Measurements)
if err != nil {
logger.Error(c, "create real time data subscription config failed", "error", err)
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", network.RealTimeSubPayload{
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
case constants.SubStopAction:
results, err := globalSubState.RemoveTargets(c, clientID, request.Measurements)
if err != nil {
logger.Error(c, "remove target to real time data subscription config failed", "error", err)
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
case constants.SubAppendAction:
results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements)
if err != nil {
logger.Error(c, "append target to real time data subscription config failed", "error", err)
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
case constants.SubUpdateAction:
results, err := globalSubState.UpdateTargets(c, tx, clientID, request.Measurements)
if err != nil {
logger.Error(c, "update target to real time data subscription config failed", "error", err)
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
default:
err := fmt.Errorf("%w: request action is %s", common.ErrUnsupportedSubAction, request.Action)
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action)
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, constants.CodeUnsupportSubOperation, err)
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), network.RealTimeSubPayload{
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
@ -241,12 +283,12 @@ func processAndValidateTargetsForStart(ctx context.Context, tx *gorm.DB, measure
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.CodeFoundTargetFailed
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.CodeSuccess
targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg
targetProcessResults = append(targetProcessResults, targetResult)
successfulTargets = append(successfulTargets, target)
@ -285,7 +327,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
if _, exist := config.targetContext[target]; !exist {
err := fmt.Errorf("target %s does not exists in subscription list", target)
logger.Error(ctx, "update target does not exist in subscription list", "error", err, "target", target)
targetResult.Code = constants.CodeUpdateSubTargetMissing
targetResult.Code = constants.UpdateSubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
@ -294,13 +336,13 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.CodeDBQueryFailed
targetResult.Code = constants.UpdateSubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.CodeSuccess
targetResult.Code = constants.UpdateSubSuccessCode
targetResult.Msg = constants.UpdateSubSuccessMsg
targetProcessResults = append(targetProcessResults, targetResult)
successfulTargets = append(successfulTargets, target)
@ -431,7 +473,7 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI
if !exist {
err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID)
logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err)
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeAppendSubTargetMissing, err), err
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
}
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount)
@ -465,7 +507,7 @@ func filterAndDeduplicateRepeatTargets(resultsSlice []network.TargetResult, idsS
for index := range resultsSlice {
if _, isTarget := set[resultsSlice[index].ID]; isTarget {
resultsSlice[index].Code = constants.CodeSubTargetRepeat
resultsSlice[index].Code = constants.SubRepeatCode
resultsSlice[index].Msg = constants.SubRepeatMsg
}
}
@ -533,7 +575,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
s.globalMutex.RUnlock()
err := fmt.Errorf("clientID %s not found", clientID)
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeCancelSubTargetMissing, err), err
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
}
s.globalMutex.RUnlock()
@ -553,7 +595,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
for _, target := range measTargets {
targetResult := network.TargetResult{
ID: target,
Code: constants.CodeCancelSubTargetMissing,
Code: constants.CancelSubFailedCode,
Msg: constants.CancelSubFailedMsg,
}
targetProcessResults = append(targetProcessResults, targetResult)
@ -574,7 +616,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
transportTargets.Targets = append(transportTargets.Targets, existingTarget)
targetResult := network.TargetResult{
ID: existingTarget,
Code: constants.CodeSuccess,
Code: constants.CancelSubSuccessCode,
Msg: constants.CancelSubSuccessMsg,
}
targetProcessResults = append(targetProcessResults, targetResult)
@ -597,7 +639,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
for target := range targetsToRemoveMap {
targetResult := network.TargetResult{
ID: target,
Code: constants.CodeCancelSubTargetMissing,
Code: constants.CancelSubFailedCode,
Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()),
}
targetProcessResults = append(targetProcessResults, targetResult)
@ -621,15 +663,17 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
// UpdateTargets define function to update targets in SharedSubState
func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) {
requestTargetsCount := processRealTimeRequestCount(measurements)
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
s.globalMutex.RLock()
config, exist := s.subMap[clientID]
s.globalMutex.RUnlock()
if !exist {
s.globalMutex.RUnlock()
err := fmt.Errorf("clientID %s not found", clientID)
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeUpdateSubTargetMissing, err), err
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
}
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForUpdate(ctx, tx, config, measurements, requestTargetsCount)
@ -678,13 +722,13 @@ func processRealTimeRequestCount(measurements []network.RealTimeMeasurementItem)
return totalTargetsCount
}
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, businessCode int, err error) []network.TargetResult {
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult {
targetProcessResults := make([]network.TargetResult, 0, targetCount)
for _, measurementItem := range measurements {
for _, target := range measurementItem.Targets {
var targetResult network.TargetResult
targetResult.ID = target
targetResult.Code = businessCode
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
}

34
main.go
View File

@ -13,15 +13,13 @@ import (
"path/filepath"
"syscall"
"modelRT/alert"
"modelRT/config"
"modelRT/constants"
"modelRT/database"
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
"modelRT/mq"
"modelRT/pool"
"modelRT/real-time-data/alert"
"modelRT/router"
"modelRT/util"
@ -100,14 +98,14 @@ func main() {
panic(err)
}
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceName, modelRTConfig.SecretKey)
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceConfig.ServiceName, modelRTConfig.ServiceConfig.SecretKey)
if err != nil {
logger.Error(ctx, "generate client token failed", "error", err)
panic(err)
}
// init postgresDBClient
postgresDBClient = database.InitPostgresDBInstance(modelRTConfig.PostgresDBURI)
postgresDBClient = database.InitPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI)
defer func() {
sqlDB, err := postgresDBClient.DB()
@ -129,17 +127,13 @@ func main() {
defer parsePool.Release()
searchPool, err := util.NewRedigoPool(modelRTConfig.StorageRedisConfig)
if err != nil {
logger.Error(ctx, "init redigo pool failed", "error", err)
panic(err)
}
defer searchPool.Close()
model.InitAutocompleterWithPool(searchPool)
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig, modelRTConfig.DeployEnv)
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig)
defer storageClient.Close()
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig, modelRTConfig.DeployEnv)
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig)
defer lockerClient.Close()
// init anchor param ants pool
@ -150,11 +144,6 @@ func main() {
}
defer anchorRealTimePool.Release()
// init rabbitmq connection
mq.InitRabbitProxy(ctx, modelRTConfig.RabbitMQConfig)
// async push event to rabbitMQ
go mq.PushUpDownLimitEventToRabbitMQ(ctx, mq.MsgChan)
postgresDBClient.Transaction(func(tx *gorm.DB) error {
// load circuit diagram from postgres
// componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool)
@ -204,7 +193,7 @@ func main() {
logger.Error(ctx, "load topologic info from postgres failed", "error", err)
panic(err)
}
go realtimedata.StartComputingRealTimeDataLimit(ctx, allMeasurement)
go realtimedata.StartRealTimeDataComputing(ctx, allMeasurement)
tree, err := database.QueryTopologicFromDB(ctx, tx)
if err != nil {
@ -215,10 +204,8 @@ func main() {
return nil
})
// use release mode in production
if modelRTConfig.DeployEnv == constants.ProductionDeployMode {
gin.SetMode(gin.ReleaseMode)
}
// use release mode in productio
// gin.SetMode(gin.ReleaseMode)
engine := gin.New()
router.RegisterRoutes(engine, serviceToken)
@ -236,7 +223,7 @@ func main() {
// }
server := http.Server{
Addr: modelRTConfig.ServiceAddr,
Addr: modelRTConfig.ServiceConfig.ServiceAddr,
Handler: engine,
}
@ -245,12 +232,9 @@ func main() {
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-done
logger.Info(ctx, "shutdown signal received, cleaning up...")
if err := server.Shutdown(context.Background()); err != nil {
logger.Error(ctx, "shutdown serverError", "err", err)
}
mq.CloseRabbitProxy()
logger.Info(ctx, "resources cleaned up, exiting")
}()
logger.Info(ctx, "starting ModelRT server")

View File

@ -7,7 +7,6 @@ import (
"strconv"
"strings"
"modelRT/common"
"modelRT/constants"
)
@ -62,7 +61,7 @@ func generateChannelName(prefix string, number int, suffix string) (string, erro
switch prefix {
case constants.ChannelPrefixTelemetry:
if number > 10 {
return "", common.ErrExceedsLimitType
return "", constants.ErrExceedsLimitType
}
var builder strings.Builder
numberStr := strconv.Itoa(number)
@ -87,7 +86,7 @@ func generateChannelName(prefix string, number int, suffix string) (string, erro
channelName := builder.String()
return channelName, nil
default:
return "", common.ErrUnsupportedChannelPrefixType
return "", constants.ErrUnsupportedChannelPrefixType
}
}
@ -165,14 +164,14 @@ func (m MeasurementDataSource) GetIOAddress() (IOAddress, error) {
if addr, ok := m.IOAddress.(CL3611Address); ok {
return addr, nil
}
return nil, common.ErrInvalidAddressType
return nil, constants.ErrInvalidAddressType
case constants.DataSourceTypePower104:
if addr, ok := m.IOAddress.(Power104Address); ok {
return addr, nil
}
return nil, common.ErrInvalidAddressType
return nil, constants.ErrInvalidAddressType
default:
return nil, common.ErrUnknownDataType
return nil, constants.ErrUnknownDataType
}
}

View File

@ -1,39 +0,0 @@
// Package event define real time data evnet operation functions
package event
// EventRecord define struct for CIM event record
type EventRecord struct {
// 事件名称
EventName string `json:"event"`
// 事件唯一标识符
EventUUID string `json:"event_uuid"`
// 事件类型
Type int `json:"type"`
// 事件优先级 (0-9)
Priority int `json:"priority"`
// 事件状态
Status int `json:"status"`
// 可选模板参数
Category string `json:"category,omitempty"`
// 毫秒级时间戳 (Unix epoch)
Timestamp int64 `json:"timestamp"`
// 事件来源 (station, platform, msa)
From string `json:"from"`
// 事件场景描述对象 (如阈值、当前值)
Condition map[string]any `json:"condition"`
// 与事件相关的订阅信息
AttachedSubscriptions []any `json:"attached_subscriptions"`
// 事件分析结果对象
Result map[string]any `json:"result,omitempty"`
// 操作历史记录 (CIM ActivityRecord)
Operations []OperationRecord `json:"operations"`
// 子站告警原始数据 (CIM Alarm 数据)
Origin map[string]any `json:"origin,omitempty"`
}
// OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等
type OperationRecord struct {
Action string `json:"action"` // 执行的动作,如 "acknowledgment"
Op string `json:"op"` // 操作人/操作账号标识
TS int64 `json:"ts"` // 操作发生的毫秒时间戳
}

View File

@ -1,82 +0,0 @@
// Package event define real time data evnet operation functions
package event
import (
"context"
"modelRT/common"
"modelRT/logger"
)
type actionHandler func(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error)
// actionDispatchMap define variable to store all action handler into map
var actionDispatchMap = map[string]actionHandler{
"info": handleInfoAction,
"warning": handleWarningAction,
"error": handleErrorAction,
"critical": handleCriticalAction,
"exception": handleExceptionAction,
}
// TriggerEventAction define func to trigger event by action in compute config
func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) (*EventRecord, error) {
handler, exists := actionDispatchMap[command]
if !exists {
logger.Error(ctx, "unknown action command", "command", command)
return nil, common.ErrUnknowEventActionCommand
}
eventRecord, err := handler(ctx, eventName, ops...)
if err != nil {
logger.Error(ctx, "action event handler failed", "error", err)
return nil, common.ErrExecEventActionFailed
}
return eventRecord, nil
}
func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
logger.Info(ctx, "trigger info event", "event_name", eventName)
eventRecord, err := NewGeneralPlatformSoftRecord(eventName, ops...)
if err != nil {
logger.Error(ctx, "generate info event record failed", "error", err)
return nil, err
}
return eventRecord, nil
}
func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
logger.Info(ctx, "trigger warning event", "event_name", eventName)
eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...)
if err != nil {
logger.Error(ctx, "generate warning event record failed", "error", err)
return nil, err
}
return eventRecord, nil
}
func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
logger.Info(ctx, "trigger error event", "event_name", eventName)
eventRecord, err := NewCriticalPlatformSoftRecord(eventName, ops...)
if err != nil {
logger.Error(ctx, "generate error event record failed", "error", err)
return nil, err
}
return eventRecord, nil
}
func handleCriticalAction(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send critical level event using actionParams ...
logger.Warn(ctx, "trigger critical event", "message", actionParams)
return nil, nil
}
func handleExceptionAction(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send except level event using actionParams ...
logger.Warn(ctx, "trigger except event", "message", actionParams)
return nil, nil
}

View File

@ -1,85 +0,0 @@
// Package event define real time data evnet operation functions
package event
import (
"maps"
"strings"
)
// EventOption define option function type for event record creation
type EventOption func(*EventRecord)
// WithCondition define option function to set event condition description
func WithCondition(cond map[string]any) EventOption {
return func(e *EventRecord) {
if cond != nil {
e.Condition = cond
}
}
}
// WithSubscriptions define option function to set event attached subscription information
func WithSubscriptions(subs []any) EventOption {
return func(e *EventRecord) {
if subs != nil {
e.AttachedSubscriptions = subs
}
}
}
// WithOperations define option function to set event operation records
func WithOperations(ops []OperationRecord) EventOption {
return func(e *EventRecord) {
if ops != nil {
e.Operations = ops
}
}
}
// WithCategory define option function to set event category
func WithCategory(cat string) EventOption {
return func(e *EventRecord) {
e.Category = cat
}
}
// WithResult define option function to set event analysis result
func WithResult(result map[string]any) EventOption {
return func(e *EventRecord) {
e.Result = result
}
}
func WithTEAnalysisResult(breachType string) EventOption {
return func(e *EventRecord) {
if e.Result == nil {
e.Result = make(map[string]any)
}
description := "数据异常"
switch strings.ToLower(breachType) {
case "upup":
description = "超越上上限"
case "up":
description = "超越上限"
case "down":
description = "超越下限"
case "downdown":
description = "超越下下限"
}
e.Result["analysis_desc"] = description
e.Result["breach_type"] = breachType
}
}
// WithConditionValue define option function to set event condition with real time value and extra data
func WithConditionValue(realTimeValue []float64, extraData map[string]any) EventOption {
return func(e *EventRecord) {
if e.Condition == nil {
e.Condition = make(map[string]any)
}
e.Condition["real_time_value"] = realTimeValue
maps.Copy(e.Condition, extraData)
}
}

View File

@ -1,68 +0,0 @@
// Package event define real time data evnet operation functions
package event
import (
"fmt"
"time"
"modelRT/constants"
"github.com/gofrs/uuid"
)
// NewPlatformEventRecord define func to create a new platform event record with common fields initialized
func NewPlatformEventRecord(eventType int, priority int, eventName string, opts ...EventOption) (*EventRecord, error) {
u, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("failed to generate UUID: %w", err)
}
record := &EventRecord{
EventName: eventName,
EventUUID: u.String(),
Type: eventType,
Priority: priority,
Status: 1,
From: constants.EventFromPlatform,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Condition: make(map[string]any),
AttachedSubscriptions: make([]any, 0),
Operations: make([]OperationRecord, 0),
}
for _, opt := range opts {
opt(record)
}
return record, nil
}
// NewGeneralPlatformSoftRecord define func to create a new general platform software event record
func NewGeneralPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
return NewPlatformEventRecord(int(constants.EventGeneralPlatformSoft), 0, name, opts...)
}
// NewGeneralApplicationSoftRecord define func to create a new general application software event record
func NewGeneralApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
return NewPlatformEventRecord(int(constants.EventGeneralApplicationSoft), 0, name, opts...)
}
// NewWarnPlatformSoftRecord define func to create a new warning platform software event record
func NewWarnPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
return NewPlatformEventRecord(int(constants.EventWarnPlatformSoft), 3, name, opts...)
}
// NewWarnApplicationSoftRecord define func to create a new warning application software event record
func NewWarnApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
return NewPlatformEventRecord(int(constants.EventWarnApplicationSoft), 3, name, opts...)
}
// NewCriticalPlatformSoftRecord define func to create a new critical platform software event record
func NewCriticalPlatformSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
return NewPlatformEventRecord(int(constants.EventCriticalPlatformSoft), 6, name, opts...)
}
// NewCriticalApplicationSoftRecord define func to create a new critical application software event record
func NewCriticalApplicationSoftRecord(name string, opts ...EventOption) (*EventRecord, error) {
return NewPlatformEventRecord(int(constants.EventCriticalApplicationSoft), 6, name, opts...)
}

View File

@ -1,146 +0,0 @@
// Package mq provides read or write access to message queue services
package mq
import (
"context"
"encoding/json"
"time"
"modelRT/constants"
"modelRT/logger"
"modelRT/mq/event"
amqp "github.com/rabbitmq/amqp091-go"
)
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
var MsgChan chan *event.EventRecord
func init() {
MsgChan = make(chan *event.EventRecord, 10000)
}
func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
var channel *amqp.Channel
var err error
channel, err = GetConn().Channel()
if err != nil {
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
return nil, err
}
err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare event dead letter exchange failed", "error", err)
return nil, err
}
_, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare event dead letter queue failed", "error", err)
return nil, err
}
err = channel.QueueBind(constants.EventUpDownDeadQueueName, "#", constants.EventDeadExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err)
return nil, err
}
err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare event exchange failed", "error", err)
return nil, err
}
args := amqp.Table{
"x-max-length": int32(50),
"x-dead-letter-exchange": constants.EventDeadExchangeName,
"x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey,
}
_, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args)
if err != nil {
logger.Error(ctx, "declare event queue failed", "error", err)
return nil, err
}
err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind event queue with routing key and exchange failed", "error", err)
return nil, err
}
if err := channel.Confirm(false); err != nil {
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
return nil, err
}
return channel, nil
}
// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.EventRecord) {
channel, err := initUpDownLimitEventChannel(ctx)
if err != nil {
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
return
}
// TODO 使用配置修改确认模式通道参数
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100))
go func() {
for {
select {
case confirm, ok := <-confirms:
if !ok {
return
}
if !confirm.Ack {
logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag)
}
case <-ctx.Done():
return
}
}
}()
for {
select {
case <-ctx.Done():
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
channel.Close()
return
case eventRecord, ok := <-msgChan:
if !ok {
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
channel.Close()
return
}
// TODO 将消息的序列化移动到发送之前以便使用eventRecord的category来作为routing key
recordBytes, err := json.Marshal(eventRecord)
if err != nil {
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
continue
}
// send event alarm message to rabbitMQ queue
routingKey := eventRecord.Category
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
err = channel.PublishWithContext(pubCtx,
constants.EventExchangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: recordBytes,
})
cancel()
if err != nil {
logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", recordBytes, "error", err)
}
}
}
}

View File

@ -1,217 +0,0 @@
// Package mq define message queue operation functions
package mq
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"sync"
"time"
"modelRT/config"
"modelRT/logger"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/youmark/pkcs8"
)
var (
_globalRabbitMQProxy *RabbitMQProxy
rabbitMQOnce sync.Once
)
// RabbitMQProxy define stuct of rabbitMQ connection proxy
type RabbitMQProxy struct {
tlsConf *tls.Config
conn *amqp.Connection
cancel context.CancelFunc
mu sync.Mutex
}
// rabbitMQCertConf define stuct of rabbitMQ connection certificates config
type rabbitMQCertConf struct {
serverName string
insecureSkipVerify bool
clientCert tls.Certificate
caCertPool *x509.CertPool
}
// GetConn define func to return the rabbitMQ connection
func GetConn() *amqp.Connection {
_globalRabbitMQProxy.mu.Lock()
defer _globalRabbitMQProxy.mu.Unlock()
return _globalRabbitMQProxy.conn
}
// InitRabbitProxy return instance of rabbitMQ connection
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
amqpURI := generateRabbitMQURI(rCfg)
tlsConf, err := initCertConf(rCfg)
if err != nil {
logger.Error(ctx, "init rabbitMQ cert config failed", "error", err)
panic(err)
}
rabbitMQOnce.Do(func() {
cancelCtx, cancel := context.WithCancel(ctx)
conn := initRabbitMQ(ctx, amqpURI, tlsConf)
_globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel}
go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI)
})
return _globalRabbitMQProxy
}
// initRabbitMQ return instance of rabbitMQ connection
func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection {
logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI)
conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
TLSClientConfig: tlsConf,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second,
})
if err != nil {
logger.Error(ctx, "init rabbitMQ connection failed", "error", err)
panic(err)
}
return conn
}
func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) {
for {
closeChan := make(chan *amqp.Error)
GetConn().NotifyClose(closeChan)
select {
case <-ctx.Done():
logger.Info(ctx, "context cancelled, exiting handleReconnect")
return
case err, ok := <-closeChan:
if !ok {
logger.Info(ctx, "rabbitMQ notify channel closed")
return
}
if err == nil {
logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect")
return
}
logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err)
}
if !p.reconnect(ctx, rabbitMQURI) {
return
}
}
}
func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool {
for {
logger.Info(ctx, "attempting to reconnect to rabbitMQ...")
select {
case <-ctx.Done():
return false
case <-time.After(5 * time.Second):
}
newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
TLSClientConfig: p.tlsConf,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second,
})
if err == nil {
p.mu.Lock()
p.conn = newConn
p.mu.Unlock()
logger.Info(ctx, "rabbitMQ reconnected successfully")
return true
}
logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err)
}
}
// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine
func CloseRabbitProxy() {
if _globalRabbitMQProxy != nil {
_globalRabbitMQProxy.cancel()
_globalRabbitMQProxy.mu.Lock()
if _globalRabbitMQProxy.conn != nil {
_globalRabbitMQProxy.conn.Close()
}
_globalRabbitMQProxy.mu.Unlock()
}
}
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
// TODO 考虑拆分用户名密码配置项,兼容不同认证方式
// user := url.QueryEscape(rCfg.User)
// password := url.QueryEscape(rCfg.Password)
// amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/",
// user,
// password,
// rCfg.Host,
// rCfg.Port,
// )
amqpURI := fmt.Sprintf("amqps://%s:%d/",
rCfg.Host,
rCfg.Port,
)
return amqpURI
}
func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) {
tlsConf := &tls.Config{
InsecureSkipVerify: rCfg.InsecureSkipVerify,
ServerName: rCfg.ServerName,
}
caCert, err := os.ReadFile(rCfg.CACertPath)
if err != nil {
return nil, fmt.Errorf("read server ca file failed: %w", err)
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath)
}
tlsConf.RootCAs = caCertPool
certPEM, err := os.ReadFile(rCfg.ClientCertPath)
if err != nil {
return nil, fmt.Errorf("read client cert file failed: %w", err)
}
keyData, err := os.ReadFile(rCfg.ClientKeyPath)
if err != nil {
return nil, fmt.Errorf("read private key file failed: %w", err)
}
block, _ := pem.Decode(keyData)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block from private key")
}
der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword))
if err != nil {
return nil, fmt.Errorf("parse password-protected private key failed: %w", err)
}
privBytes, err := x509.MarshalPKCS8PrivateKey(der)
if err != nil {
return nil, fmt.Errorf("marshal private key failed: %w", err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
clientCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return nil, fmt.Errorf("create x509 key pair failed: %w", err)
}
tlsConf.Certificates = []tls.Certificate{clientCert}
return tlsConf, nil
}

View File

@ -1,95 +0,0 @@
// Package network define struct of network operation
package network
import (
"time"
"github.com/gofrs/uuid"
)
// AsyncTaskCreateRequest defines the request structure for creating an asynchronous task
type AsyncTaskCreateRequest struct {
// required: true
// enum: TOPOLOGY_ANALYSIS, PERFORMANCE_ANALYSIS, EVENT_ANALYSIS, BATCH_IMPORT
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"异步任务类型"`
// required: true
Params map[string]interface{} `json:"params" swaggertype:"object" description:"任务参数,根据任务类型不同而不同"`
}
// AsyncTaskCreateResponse defines the response structure for creating an asynchronous task
type AsyncTaskCreateResponse struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
}
// AsyncTaskResultQueryRequest defines the request structure for querying task results
type AsyncTaskResultQueryRequest struct {
// required: true
TaskIDs []uuid.UUID `json:"task_ids" swaggertype:"array,string" example:"[\"123e4567-e89b-12d3-a456-426614174000\",\"223e4567-e89b-12d3-a456-426614174001\"]" description:"任务ID列表"`
}
// AsyncTaskResult defines the structure for a single task result
type AsyncTaskResult struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"任务类型"`
Status string `json:"status" example:"COMPLETED" description:"任务状态SUBMITTED, RUNNING, COMPLETED, FAILED"`
Progress *int `json:"progress,omitempty" example:"65" description:"任务进度(0-100)仅当状态为RUNNING时返回"`
CreatedAt int64 `json:"created_at" example:"1741846200" description:"任务创建时间戳"`
FinishedAt *int64 `json:"finished_at,omitempty" example:"1741846205" description:"任务完成时间戳仅当状态为COMPLETED或FAILED时返回"`
Result map[string]interface{} `json:"result,omitempty" swaggertype:"object" description:"任务结果仅当状态为COMPLETED时返回"`
ErrorCode *int `json:"error_code,omitempty" example:"400102" description:"错误码仅当状态为FAILED时返回"`
ErrorMessage *string `json:"error_message,omitempty" example:"Component UUID not found" description:"错误信息仅当状态为FAILED时返回"`
ErrorDetail map[string]interface{} `json:"error_detail,omitempty" swaggertype:"object" description:"错误详情仅当状态为FAILED时返回"`
}
// AsyncTaskResultQueryResponse defines the response structure for querying task results
type AsyncTaskResultQueryResponse struct {
Total int `json:"total" example:"3" description:"查询的任务总数"`
Tasks []AsyncTaskResult `json:"tasks" description:"任务结果列表"`
}
// AsyncTaskProgressUpdate defines the structure for task progress update
type AsyncTaskProgressUpdate struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
Progress int `json:"progress" example:"50" description:"任务进度(0-100)"`
}
// AsyncTaskStatusUpdate defines the structure for task status update
type AsyncTaskStatusUpdate struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
Status string `json:"status" example:"RUNNING" description:"任务状态SUBMITTED, RUNNING, COMPLETED, FAILED"`
Timestamp int64 `json:"timestamp" example:"1741846205" description:"状态更新时间戳"`
}
// TopologyAnalysisParams defines the parameters for topology analysis task
type TopologyAnalysisParams struct {
StartUUID string `json:"start_uuid" example:"comp-001" description:"起始元件UUID"`
EndUUID string `json:"end_uuid" example:"comp-999" description:"目标元件UUID"`
}
// PerformanceAnalysisParams defines the parameters for performance analysis task
type PerformanceAnalysisParams struct {
ComponentIDs []string `json:"component_ids" example:"[\"comp-001\",\"comp-002\"]" description:"需要分析的元件ID列表"`
TimeRange struct {
Start time.Time `json:"start" example:"2026-03-01T00:00:00Z" description:"分析开始时间"`
End time.Time `json:"end" example:"2026-03-02T00:00:00Z" description:"分析结束时间"`
} `json:"time_range" description:"分析时间范围"`
}
// EventAnalysisParams defines the parameters for event analysis task
type EventAnalysisParams struct {
EventType string `json:"event_type" example:"MOTOR_START" description:"事件类型"`
StartTime time.Time `json:"start_time" example:"2026-03-01T00:00:00Z" description:"事件开始时间"`
EndTime time.Time `json:"end_time" example:"2026-03-02T00:00:00Z" description:"事件结束时间"`
Components []string `json:"components,omitempty" example:"[\"comp-001\",\"comp-002\"]" description:"关联的元件列表"`
}
// BatchImportParams defines the parameters for batch import task
type BatchImportParams struct {
FilePath string `json:"file_path" example:"/data/import/model.csv" description:"导入文件路径"`
FileType string `json:"file_type" example:"CSV" description:"文件类型CSV, JSON, XML"`
Options struct {
Overwrite bool `json:"overwrite" example:"false" description:"是否覆盖现有数据"`
Validate bool `json:"validate" example:"true" description:"是否进行数据验证"`
NotifyUser bool `json:"notify_user" example:"true" description:"是否通知用户"`
} `json:"options" description:"导入选项"`
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"time"
"modelRT/common"
"modelRT/common/errcode"
"modelRT/constants"
"modelRT/orm"
@ -65,10 +64,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
switch info.ChangeType {
case constants.UUIDFromChangeType:
if info.NewUUIDFrom == info.OldUUIDFrom {
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT1)
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT1)
}
if info.NewUUIDTo != info.OldUUIDTo {
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT1)
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT1)
}
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
@ -91,10 +90,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
UUIDChangeInfo.NewUUIDTo = OldUUIDTo
case constants.UUIDToChangeType:
if info.NewUUIDFrom != info.OldUUIDFrom {
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT2)
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT2)
}
if info.NewUUIDTo == info.OldUUIDTo {
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT2)
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT2)
}
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
@ -117,10 +116,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
UUIDChangeInfo.NewUUIDTo = newUUIDTo
case constants.UUIDAddChangeType:
if info.OldUUIDFrom != "" {
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT3)
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT3)
}
if info.OldUUIDTo != "" {
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT3)
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT3)
}
newUUIDFrom, err := uuid.FromString(info.NewUUIDFrom)

View File

@ -3,25 +3,18 @@ package network
// FailureResponse define struct of standard failure API response format
type FailureResponse struct {
Code int `json:"code" example:"3000"`
Msg string `json:"msg" example:"process completed with partial failures"`
Code int `json:"code" example:"500"`
Msg string `json:"msg" example:"failed to get recommend data from redis"`
Payload any `json:"payload" swaggertype:"object"`
}
// SuccessResponse define struct of standard successful API response format
type SuccessResponse struct {
Code int `json:"code" example:"2000"`
Msg string `json:"msg" example:"process completed"`
Code int `json:"code" example:"200"`
Msg string `json:"msg" example:"success"`
Payload any `json:"payload" swaggertype:"object"`
}
// WSResponse define struct of standard websocket API response format
type WSResponse struct {
Code int `json:"code" example:"2000"`
Msg string `json:"msg" example:"process completed"`
Payload any `json:"payload,omitempty" swaggertype:"object"`
}
// MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response.
type MeasurementRecommendPayload struct {
Input string `json:"input" example:"transformfeeder1_220."`
@ -33,7 +26,7 @@ type MeasurementRecommendPayload struct {
// TargetResult define struct of target item in real time data subscription response payload
type TargetResult struct {
ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"`
Code int `json:"code" example:"20000"`
Code string `json:"code" example:"1001"`
Msg string `json:"msg" example:"subscription success"`
}

View File

@ -1,115 +0,0 @@
// Package orm define database data struct
package orm
import (
"github.com/gofrs/uuid"
)
// AsyncTaskType defines the type of asynchronous task
type AsyncTaskType string
const (
// AsyncTaskTypeTopologyAnalysis represents topology analysis task
AsyncTaskTypeTopologyAnalysis AsyncTaskType = "TOPOLOGY_ANALYSIS"
// AsyncTaskTypePerformanceAnalysis represents performance analysis task
AsyncTaskTypePerformanceAnalysis AsyncTaskType = "PERFORMANCE_ANALYSIS"
// AsyncTaskTypeEventAnalysis represents event analysis task
AsyncTaskTypeEventAnalysis AsyncTaskType = "EVENT_ANALYSIS"
// AsyncTaskTypeBatchImport represents batch import task
AsyncTaskTypeBatchImport AsyncTaskType = "BATCH_IMPORT"
)
// AsyncTaskStatus defines the status of asynchronous task
type AsyncTaskStatus string
const (
// AsyncTaskStatusSubmitted represents task has been submitted to queue
AsyncTaskStatusSubmitted AsyncTaskStatus = "SUBMITTED"
// AsyncTaskStatusRunning represents task is currently executing
AsyncTaskStatusRunning AsyncTaskStatus = "RUNNING"
// AsyncTaskStatusCompleted represents task completed successfully
AsyncTaskStatusCompleted AsyncTaskStatus = "COMPLETED"
// AsyncTaskStatusFailed represents task failed with error
AsyncTaskStatusFailed AsyncTaskStatus = "FAILED"
)
// AsyncTask defines the core task entity stored in database for task lifecycle tracking
type AsyncTask struct {
TaskID uuid.UUID `gorm:"column:task_id;primaryKey;type:uuid;default:gen_random_uuid()"`
TaskType AsyncTaskType `gorm:"column:task_type;type:varchar(50);not null"`
Status AsyncTaskStatus `gorm:"column:status;type:varchar(20);not null;index"`
Params JSONMap `gorm:"column:params;type:jsonb"`
CreatedAt int64 `gorm:"column:created_at;not null;index"`
FinishedAt *int64 `gorm:"column:finished_at;index"`
Progress *int `gorm:"column:progress"` // 0-100, nullable
}
// TableName returns the table name for AsyncTask model
func (a *AsyncTask) TableName() string {
return "async_task"
}
// SetSubmitted marks the task as submitted
func (a *AsyncTask) SetSubmitted() {
a.Status = AsyncTaskStatusSubmitted
}
// SetRunning marks the task as running
func (a *AsyncTask) SetRunning() {
a.Status = AsyncTaskStatusRunning
}
// SetCompleted marks the task as completed with finished timestamp
func (a *AsyncTask) SetCompleted(timestamp int64) {
a.Status = AsyncTaskStatusCompleted
a.FinishedAt = &timestamp
a.setProgress(100)
}
// SetFailed marks the task as failed with finished timestamp
func (a *AsyncTask) SetFailed(timestamp int64) {
a.Status = AsyncTaskStatusFailed
a.FinishedAt = &timestamp
}
// setProgress updates the task progress (0-100)
func (a *AsyncTask) setProgress(value int) {
if value < 0 {
value = 0
}
if value > 100 {
value = 100
}
a.Progress = &value
}
// UpdateProgress updates the task progress with validation
func (a *AsyncTask) UpdateProgress(value int) {
a.setProgress(value)
}
// IsCompleted checks if the task is completed
func (a *AsyncTask) IsCompleted() bool {
return a.Status == AsyncTaskStatusCompleted
}
// IsRunning checks if the task is running
func (a *AsyncTask) IsRunning() bool {
return a.Status == AsyncTaskStatusRunning
}
// IsFailed checks if the task failed
func (a *AsyncTask) IsFailed() bool {
return a.Status == AsyncTaskStatusFailed
}
// IsValidTaskType checks if the task type is valid
func IsValidAsyncTaskType(taskType string) bool {
switch AsyncTaskType(taskType) {
case AsyncTaskTypeTopologyAnalysis, AsyncTaskTypePerformanceAnalysis,
AsyncTaskTypeEventAnalysis, AsyncTaskTypeBatchImport:
return true
default:
return false
}
}

View File

@ -1,70 +0,0 @@
// Package orm define database data struct
package orm
import (
"github.com/gofrs/uuid"
)
// AsyncTaskResult stores computation results, separate from AsyncTask model for flexibility
type AsyncTaskResult struct {
TaskID uuid.UUID `gorm:"column:task_id;primaryKey;type:uuid"`
Result JSONMap `gorm:"column:result;type:jsonb"`
ErrorCode *int `gorm:"column:error_code"`
ErrorMessage *string `gorm:"column:error_message;type:text"`
ErrorDetail JSONMap `gorm:"column:error_detail;type:jsonb"`
}
// TableName returns the table name for AsyncTaskResult model
func (a *AsyncTaskResult) TableName() string {
return "async_task_result"
}
// SetSuccess sets the result for successful task execution
func (a *AsyncTaskResult) SetSuccess(result JSONMap) {
a.Result = result
a.ErrorCode = nil
a.ErrorMessage = nil
a.ErrorDetail = nil
}
// SetError sets the error information for failed task execution
func (a *AsyncTaskResult) SetError(code int, message string, detail JSONMap) {
a.Result = nil
a.ErrorCode = &code
a.ErrorMessage = &message
a.ErrorDetail = detail
}
// HasError checks if the task result contains an error
func (a *AsyncTaskResult) HasError() bool {
return a.ErrorCode != nil || a.ErrorMessage != nil
}
// GetErrorCode returns the error code or 0 if no error
func (a *AsyncTaskResult) GetErrorCode() int {
if a.ErrorCode == nil {
return 0
}
return *a.ErrorCode
}
// GetErrorMessage returns the error message or empty string if no error
func (a *AsyncTaskResult) GetErrorMessage() string {
if a.ErrorMessage == nil {
return ""
}
return *a.ErrorMessage
}
// IsSuccess checks if the task execution was successful
func (a *AsyncTaskResult) IsSuccess() bool {
return !a.HasError()
}
// Clear clears all result data
func (a *AsyncTaskResult) Clear() {
a.Result = nil
a.ErrorCode = nil
a.ErrorMessage = nil
a.ErrorDetail = nil
}

View File

@ -5,11 +5,11 @@ import (
"fmt"
"time"
"modelRT/alert"
"modelRT/config"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/real-time-data/alert"
"github.com/panjf2000/ants/v2"
)

View File

@ -9,8 +9,7 @@ import (
"modelRT/constants"
"modelRT/logger"
"modelRT/mq"
"modelRT/mq/event"
"modelRT/real-time-data/event"
)
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
@ -27,13 +26,6 @@ type teEventThresholds struct {
isFloatCause bool
}
type teBreachTrigger struct {
breachType string
triggered bool
triggeredValues []float64
eventOpts []event.EventOption
}
// parseTEThresholds define func to parse telemetry thresholds by casue map
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
t := teEventThresholds{}
@ -92,74 +84,60 @@ func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeCo
// analyzeTEDataLogic define func to processing telemetry data and event triggering
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
windowSize := conf.minBreachCount
dataLen := len(realTimeValues)
if dataLen < windowSize || windowSize <= 0 {
if windowSize <= 0 {
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
return
}
statusArray := make([]string, dataLen)
for i, val := range realTimeValues {
statusArray[i] = getTEBreachType(val, thresholds)
// mark whether any events have been triggered in this batch
var eventTriggered bool
breachTriggers := map[string]bool{
"up": false, "upup": false, "down": false, "downdown": false,
}
breachTriggers := make(map[string]teBreachTrigger)
for i := 0; i <= dataLen-windowSize; i++ {
firstBreachType := statusArray[i]
// implement slide window to determine breach counts
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
window := realTimeValues[i : i+windowSize]
firstValueBreachType := getTEBreachType(window[0], thresholds)
// if the first value in the window does not breach, skip this window directly
if firstBreachType == "" {
if firstValueBreachType == "" {
continue
}
allMatch := true
for j := 1; j < windowSize; j++ {
if statusArray[i+j] != firstBreachType {
currentValueBreachType := getTEBreachType(window[j], thresholds)
if currentValueBreachType != firstValueBreachType {
allMatch = false
break
}
}
if allMatch {
triggerValues := realTimeValues[i : i+windowSize]
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
_, exists := breachTriggers[firstBreachType]
if !exists {
logger.Warn(ctx, "event triggered by sliding window",
"breach_type", firstBreachType,
"trigger_values", triggerValues)
if !breachTriggers[firstValueBreachType] {
// trigger event
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
// build Options
opts := []event.EventOption{
event.WithConditionValue(triggerValues, conf.Cause),
event.WithTEAnalysisResult(firstBreachType),
event.WithCategory(constants.EventWarnUpDownLimitCategroy),
// TODO 生成 operations并考虑如何放入 event 中
// event.WithOperations(nil)
}
breachTriggers[firstBreachType] = teBreachTrigger{
breachType: firstBreachType,
triggered: false,
triggeredValues: triggerValues,
eventOpts: opts,
}
breachTriggers[firstValueBreachType] = true
eventTriggered = true
}
}
}
for breachType, trigger := range breachTriggers {
// trigger Action
command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action)
eventName := fmt.Sprintf("telemetry_%s_%s_Breach_Event", mainBody, breachType)
eventRecord, err := event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...)
if err != nil {
logger.Error(ctx, "trigger event action failed", "error", err)
if eventTriggered {
command, content := genTEEventCommandAndContent(ctx, conf.Action)
// TODO 考虑 content 是否可以为空,先期不允许
if command == "" || content == "" {
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
return
}
mq.MsgChan <- eventRecord
event.TriggerEventAction(ctx, command, content)
return
}
}
func genTEEventCommandAndMainBody(ctx context.Context, action map[string]any) (command string, mainBody string) {
func genTEEventCommandAndContent(ctx context.Context, action map[string]any) (command string, content string) {
cmdValue, exist := action["command"]
if !exist {
logger.Error(ctx, "can not find command variable into action map", "action", action)
@ -207,7 +185,7 @@ type tiEventThresholds struct {
isFloatCause bool
}
// parseTIThresholds define func to parse telesignal thresholds by casue map
// parseTEThresholds define func to parse telesignal thresholds by casue map
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
edgeKey := "edge"
t := tiEventThresholds{
@ -233,12 +211,11 @@ func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
switch t.edge {
case constants.TelesignalRaising:
if t.edge == constants.TelesignalRaising {
if previousValue == 0.0 && currentValue == 1.0 {
return constants.TIBreachTriggerType
}
case constants.TelesignalFalling:
} else if t.edge == constants.TelesignalFalling {
if previousValue == 1.0 && currentValue == 0.0 {
return constants.TIBreachTriggerType
}
@ -320,22 +297,18 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE
}
if eventTriggered {
command, mainBody := genTIEventCommandAndMainBody(conf.Action)
if command == "" || mainBody == "" {
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "main_body", mainBody)
command, content := genTIEventCommandAndContent(conf.Action)
// TODO 考虑 content 是否可以为空,先期不允许
if command == "" || content == "" {
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
return
}
eventRecord, err := event.TriggerEventAction(ctx, command, mainBody)
if err != nil {
logger.Error(ctx, "trigger event action failed", "error", err)
return
}
mq.MsgChan <- eventRecord
event.TriggerEventAction(ctx, command, content)
return
}
}
func genTIEventCommandAndMainBody(action map[string]any) (command string, mainBody string) {
func genTIEventCommandAndContent(action map[string]any) (command string, content string) {
cmdValue, exist := action["command"]
if !exist {
return "", ""

View File

@ -0,0 +1,74 @@
// Package event define real time data evnet operation functions
package event
import (
"context"
"modelRT/logger"
)
type actionHandler func(ctx context.Context, content string) error
// actionDispatchMap define variable to store all action handler into map
var actionDispatchMap = map[string]actionHandler{
"info": handleInfoAction,
"warning": handleWarningAction,
"error": handleErrorAction,
"critical": handleCriticalAction,
"exception": handleExceptionAction,
}
// TriggerEventAction define func to trigger event by action in compute config
func TriggerEventAction(ctx context.Context, command string, content string) {
handler, exists := actionDispatchMap[command]
if !exists {
logger.Error(ctx, "unknown action command", "command", command)
return
}
err := handler(ctx, content)
if err != nil {
logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err)
return
}
logger.Info(ctx, "action handler success", "command", command, "content", content)
}
func handleInfoAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send info level event using actionParams ...
logger.Warn(ctx, "trigger info event", "message", actionParams)
return nil
}
func handleWarningAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send warning level event using actionParams ...
logger.Warn(ctx, "trigger warning event", "message", actionParams)
return nil
}
func handleErrorAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send error level event using actionParams ...
logger.Warn(ctx, "trigger error event", "message", actionParams)
return nil
}
func handleCriticalAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send critical level event using actionParams ...
logger.Warn(ctx, "trigger critical event", "message", actionParams)
return nil
}
func handleExceptionAction(ctx context.Context, content string) error {
// 实际执行发送警告、记录日志等操作
actionParams := content
// ... logic to send except level event using actionParams ...
logger.Warn(ctx, "trigger except event", "message", actionParams)
return nil
}

View File

@ -0,0 +1,400 @@
// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"errors"
"fmt"
"time"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/orm"
"modelRT/util"
)
var (
// RealTimeDataChan define channel of real time data receive
RealTimeDataChan chan network.RealTimeDataReceiveRequest
globalComputeState *MeasComputeState
)
func init() {
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
globalComputeState = NewMeasComputeState()
}
// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info
func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) {
for _, measurement := range measurements {
enableValue, exist := measurement.EventPlan["enable"]
enable, ok := enableValue.(bool)
if !exist || !enable {
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
continue
}
if !ok {
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
continue
}
conf, err := initComputeConfig(measurement)
if err != nil {
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
continue
}
if conf == nil {
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
continue
}
uuidStr := measurement.ComponentUUID.String()
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
conf.StopGchan = make(chan struct{})
globalComputeState.Store(uuidStr, conf)
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID)
go continuousComputation(enrichedCtx, conf)
}
}
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
var err error
enableValue, exist := measurement.EventPlan["enable"]
enable, ok := enableValue.(bool)
if !exist {
return nil, nil
}
if !ok {
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
}
if !enable {
return nil, nil
}
conf := &ComputeConfig{}
causeValue, exist := measurement.EventPlan["cause"]
if !exist {
return nil, errors.New("missing required field cause")
}
cause, ok := causeValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
}
conf.Cause, err = processCauseMap(cause)
if err != nil {
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
}
actionValue, exist := measurement.EventPlan["action"]
if !exist {
return nil, errors.New("missing required field action")
}
action, ok := actionValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
}
conf.Action = action
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
}
conf.QueryKey = queryKey
conf.DataSize = int64(measurement.Size)
// TODO use constant values for temporary settings
conf.minBreachCount = constants.MinBreachCount
// TODO 后续优化 duration 创建方式
conf.Duration = 10
isFloatCause := false
if _, exists := conf.Cause["up"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["down"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["upup"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["downdown"]; exists {
isFloatCause = true
}
if isFloatCause {
// te config
teThresholds, err := parseTEThresholds(conf.Cause)
if err != nil {
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
}
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
} else {
// ti config
tiThresholds, err := parseTIThresholds(conf.Cause)
if err != nil {
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
}
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
}
return conf, nil
}
func processCauseMap(data map[string]any) (map[string]any, error) {
causeResult := make(map[string]any)
keysToExtract := []string{"up", "down", "upup", "downdown"}
var foundFloatKey bool
for _, key := range keysToExtract {
if value, exists := data[key]; exists {
foundFloatKey = true
// check value type
if floatVal, ok := value.(float64); ok {
causeResult[key] = floatVal
} else {
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
}
}
}
if foundFloatKey == true {
return causeResult, nil
}
edgeKey := "edge"
if value, exists := data[edgeKey]; exists {
if stringVal, ok := value.(string); ok {
switch stringVal {
case "raising":
fallthrough
case "falling":
causeResult[edgeKey] = stringVal
default:
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
}
} else {
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
}
} else {
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
}
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
}
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
client := diagram.NewRedisClient()
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
duration := util.SecondsToDuration(conf.Duration)
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-conf.StopGchan:
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
return
case <-ctx.Done():
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
return
case <-ticker.C:
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize)
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
continue
}
realTimedatas := util.ConvertZSetMembersToFloat64(members)
if conf.Analyzer != nil {
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
} else {
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
}
}
}
}
// // ReceiveChan define func to real time data receive and process
// func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) {
// consumer, err := kafka.NewConsumer(consumerConfig)
// if err != nil {
// logger.Error(ctx, "create kafka consumer failed", "error", err)
// return
// }
// defer consumer.Close()
// err = consumer.SubscribeTopics(topics, nil)
// if err != nil {
// logger.Error(ctx, "subscribe kafka topics failed", "topic", topics, "error", err)
// return
// }
// batchSize := 100
// batchTimeout := util.SecondsToDuration(duration)
// messages := make([]*kafka.Message, 0, batchSize)
// lastCommit := time.Now()
// logger.Info(ctx, "start consuming from kafka", "topic", topics)
// for {
// select {
// case <-ctx.Done():
// logger.Info(ctx, "stop real time data computing by context cancel")
// return
// case realTimeData := <-RealTimeDataChan:
// componentUUID := realTimeData.PayLoad.ComponentUUID
// component, err := diagram.GetComponentMap(componentUUID)
// if err != nil {
// logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err)
// continue
// }
// componentType := component.Type
// if componentType != constants.DemoType {
// logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID)
// continue
// }
// var anchorName string
// var compareValUpperLimit, compareValLowerLimit float64
// var anchorRealTimeData []float64
// var calculateFunc func(archorValue float64, args ...float64) float64
// // calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData)
// for _, param := range realTimeData.PayLoad.Values {
// anchorRealTimeData = append(anchorRealTimeData, param.Value)
// }
// anchorConfig := config.AnchorParamConfig{
// AnchorParamBaseConfig: config.AnchorParamBaseConfig{
// ComponentUUID: componentUUID,
// AnchorName: anchorName,
// CompareValUpperLimit: compareValUpperLimit,
// CompareValLowerLimit: compareValLowerLimit,
// AnchorRealTimeData: anchorRealTimeData,
// },
// CalculateFunc: calculateFunc,
// CalculateParams: []float64{},
// }
// anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
// if err != nil {
// logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err)
// continue
// }
// anchorChan <- anchorConfig
// default:
// msg, err := consumer.ReadMessage(batchTimeout)
// if err != nil {
// if err.(kafka.Error).Code() == kafka.ErrTimedOut {
// // process accumulated messages when timeout
// if len(messages) > 0 {
// processMessageBatch(ctx, messages)
// consumer.Commit()
// messages = messages[:0]
// }
// continue
// }
// logger.Error(ctx, "read message from kafka failed", "error", err, "msg", msg)
// continue
// }
// messages = append(messages, msg)
// // process messages when batch size or timeout period is reached
// if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout {
// processMessageBatch(ctx, messages)
// consumer.Commit()
// messages = messages[:0]
// lastCommit = time.Now()
// }
// }
// }
// }
// type realTimeDataPayload struct {
// ComponentUUID string
// Values []float64
// }
// type realTimeData struct {
// Payload realTimeDataPayload
// }
// func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
// var realTimeData realTimeData
// err := json.Unmarshal(msgValue, &realTimeData)
// if err != nil {
// return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
// }
// return &realTimeData, nil
// }
// func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
// componentUUID := realTimeData.Payload.ComponentUUID
// component, err := diagram.GetComponentMap(componentUUID)
// if err != nil {
// logger.Error(ctx, "query component info from diagram map by component id failed",
// "component_uuid", componentUUID, "error", err)
// return
// }
// componentType := component.Type
// if componentType != constants.DemoType {
// logger.Error(ctx, "can not process real time data of component type not equal DemoType",
// "component_uuid", componentUUID)
// return
// }
// var anchorName string
// var compareValUpperLimit, compareValLowerLimit float64
// var anchorRealTimeData []float64
// var calculateFunc func(archorValue float64, args ...float64) float64
// for _, param := range realTimeData.Payload.Values {
// anchorRealTimeData = append(anchorRealTimeData, param)
// }
// anchorConfig := config.AnchorParamConfig{
// AnchorParamBaseConfig: config.AnchorParamBaseConfig{
// ComponentUUID: componentUUID,
// AnchorName: anchorName,
// CompareValUpperLimit: compareValUpperLimit,
// CompareValLowerLimit: compareValLowerLimit,
// AnchorRealTimeData: anchorRealTimeData,
// },
// CalculateFunc: calculateFunc,
// CalculateParams: []float64{},
// }
// anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID)
// if err != nil {
// logger.Error(ctx, "get anchor param chan failed",
// "component_uuid", componentUUID, "error", err)
// return
// }
// select {
// case anchorChan <- anchorConfig:
// case <-ctx.Done():
// logger.Info(ctx, "context done while sending to anchor chan")
// case <-time.After(5 * time.Second):
// logger.Error(ctx, "timeout sending to anchor chan", "component_uuid", componentUUID)
// }
// }
// // processMessageBatch define func to bathc process kafka message
// func processMessageBatch(ctx context.Context, messages []*kafka.Message) {
// for _, msg := range messages {
// realTimeData, err := parseKafkaMessage(msg.Value)
// if err != nil {
// logger.Error(ctx, "parse kafka message failed", "error", err, "msg", msg)
// continue
// }
// go processRealTimeData(ctx, realTimeData)
// }
// }

View File

@ -1,229 +0,0 @@
// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"errors"
"fmt"
"time"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/orm"
"modelRT/util"
)
var (
// RealTimeDataChan define channel of real time data receive
RealTimeDataChan chan network.RealTimeDataReceiveRequest
globalComputeState *MeasComputeState
)
func init() {
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
globalComputeState = NewMeasComputeState()
}
// StartComputingRealTimeDataLimit define func to start compute real time data up or down limit process goroutines by measurement info
func StartComputingRealTimeDataLimit(ctx context.Context, measurements []orm.Measurement) {
for _, measurement := range measurements {
enableValue, exist := measurement.EventPlan["enable"]
enable, ok := enableValue.(bool)
if !exist || !enable {
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
continue
}
if !ok {
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
continue
}
conf, err := initComputeConfig(measurement)
if err != nil {
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
continue
}
if conf == nil {
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
continue
}
uuidStr := measurement.ComponentUUID.String()
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
conf.StopGchan = make(chan struct{})
globalComputeState.Store(uuidStr, conf)
logger.Info(ctx, "starting computing real time data limit for measurement", "measurement_uuid", measurement.ComponentUUID)
go continuousComputation(enrichedCtx, conf)
}
}
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
var err error
enableValue, exist := measurement.EventPlan["enable"]
enable, ok := enableValue.(bool)
if !exist {
return nil, nil
}
if !ok {
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
}
if !enable {
return nil, nil
}
conf := &ComputeConfig{}
causeValue, exist := measurement.EventPlan["cause"]
if !exist {
return nil, errors.New("missing required field cause")
}
cause, ok := causeValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
}
conf.Cause, err = processCauseMap(cause)
if err != nil {
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
}
actionValue, exist := measurement.EventPlan["action"]
if !exist {
return nil, errors.New("missing required field action")
}
action, ok := actionValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
}
conf.Action = action
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
}
conf.QueryKey = queryKey
conf.DataSize = int64(measurement.Size)
// TODO use constant values for temporary settings
conf.minBreachCount = constants.MinBreachCount
// TODO 后续优化 duration 创建方式
conf.Duration = 10
isFloatCause := false
if _, exists := conf.Cause["up"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["down"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["upup"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["downdown"]; exists {
isFloatCause = true
}
if isFloatCause {
// te config
teThresholds, err := parseTEThresholds(conf.Cause)
if err != nil {
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
}
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
} else {
// ti config
tiThresholds, err := parseTIThresholds(conf.Cause)
if err != nil {
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
}
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
}
return conf, nil
}
func processCauseMap(data map[string]any) (map[string]any, error) {
causeResult := make(map[string]any)
keysToExtract := []string{"up", "down", "upup", "downdown"}
var foundFloatKey bool
for _, key := range keysToExtract {
if value, exists := data[key]; exists {
foundFloatKey = true
// check value type
if floatVal, ok := value.(float64); ok {
causeResult[key] = floatVal
} else {
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
}
}
}
if foundFloatKey {
return causeResult, nil
}
edgeKey := "edge"
if value, exists := data[edgeKey]; exists {
if stringVal, ok := value.(string); ok {
switch stringVal {
case "raising":
fallthrough
case "falling":
causeResult[edgeKey] = stringVal
default:
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
}
} else {
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
}
} else {
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
}
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
}
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
client := diagram.NewRedisClient()
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
duration := util.SecondsToDuration(conf.Duration)
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-conf.StopGchan:
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
return
case <-ctx.Done():
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
return
case <-ticker.C:
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize)
cancel()
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
continue
}
realTimedatas := util.ConvertZSetMembersToFloat64(members)
if len(realTimedatas) == 0 {
logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
continue
}
if conf.Analyzer != nil {
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
} else {
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
}
}
}
}

View File

@ -1,32 +0,0 @@
// Package router provides router config
package router
import (
"modelRT/handler"
"github.com/gin-gonic/gin"
)
// registerAsyncTaskRoutes define func of register async task routes
func registerAsyncTaskRoutes(rg *gin.RouterGroup, middlewares ...gin.HandlerFunc) {
g := rg.Group("/task/")
g.Use(middlewares...)
// Async task creation
g.POST("async", handler.AsyncTaskCreateHandler)
// Async task result query
g.GET("async/results", handler.AsyncTaskResultQueryHandler)
// Async task detail query
g.GET("async/:task_id", handler.AsyncTaskResultDetailHandler)
// Async task cancellation
g.POST("async/:task_id/cancel", handler.AsyncTaskCancelHandler)
// Internal APIs for worker updates (not exposed to external users)
internal := g.Group("internal/")
internal.Use(middlewares...)
internal.POST("async/progress", handler.AsyncTaskProgressUpdateHandler)
internal.POST("async/status", handler.AsyncTaskStatusUpdateHandler)
}

View File

@ -27,5 +27,4 @@ func RegisterRoutes(engine *gin.Engine, clientToken string) {
registerDataRoutes(routeGroup)
registerMonitorRoutes(routeGroup)
registerComponentRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken))
registerAsyncTaskRoutes(routeGroup, middleware.SetTokenMiddleware(clientToken))
}

View File

@ -0,0 +1,97 @@
package sharememory
import (
"fmt"
"unsafe"
"modelRT/orm"
"golang.org/x/sys/unix"
)
// CreateShareMemory defines a function to create a shared memory
func CreateShareMemory(key uintptr, structSize uintptr) (uintptr, error) {
// logger := logger.GetLoggerInstance()
// create shared memory
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, structSize, unix.IPC_CREAT|0o666)
if err != 0 {
// logger.Error(fmt.Sprintf("create shared memory by key %v failed:", key), zap.Error(err))
return 0, fmt.Errorf("create shared memory failed:%w", err)
}
// attach shared memory
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
if err != 0 {
// logger.Error(fmt.Sprintf("attach shared memory by shmID %v failed:", shmID), zap.Error(err))
return 0, fmt.Errorf("attach shared memory failed:%w", err)
}
return shmAddr, nil
}
// ReadComponentFromShareMemory defines a function to read component value from shared memory
func ReadComponentFromShareMemory(key uintptr, componentInfo *orm.Component) error {
structSize := unsafe.Sizeof(orm.Component{})
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, uintptr(int(structSize)), 0o666)
if err != 0 {
return fmt.Errorf("get shared memory failed:%w", err)
}
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
if err != 0 {
return fmt.Errorf("attach shared memory failed:%w", err)
}
// 读取共享内存中的数据
componentInfo = (*orm.Component)(unsafe.Pointer(shmAddr + structSize))
// Detach shared memory
unix.Syscall(unix.SYS_SHMDT, shmAddr, 0, 0)
return nil
}
func WriteComponentInShareMemory(key uintptr, componentInfo *orm.Component) error {
structSize := unsafe.Sizeof(orm.Component{})
shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, uintptr(int(structSize)), 0o666)
if err != 0 {
return fmt.Errorf("get shared memory failed:%w", err)
}
shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0)
if err != 0 {
return fmt.Errorf("attach shared memory failed:%w", err)
}
obj := (*orm.Component)(unsafe.Pointer(shmAddr + unsafe.Sizeof(structSize)))
fmt.Println(obj)
// id integer NOT NULL DEFAULT nextval('component_id_seq'::regclass),
// global_uuid uuid NOT NULL DEFAULT gen_random_uuid(),
// nspath character varying(32) COLLATE pg_catalog."default",
// tag character varying(32) COLLATE pg_catalog."default" NOT NULL,
// name character varying(64) COLLATE pg_catalog."default" NOT NULL,
// description character varying(512) COLLATE pg_catalog."default" NOT NULL DEFAULT ''::character varying,
// grid character varying(64) COLLATE pg_catalog."default" NOT NULL,
// zone character varying(64) COLLATE pg_catalog."default" NOT NULL,
// station character varying(64) COLLATE pg_catalog."default" NOT NULL,
// type integer NOT NULL,
// in_service boolean DEFAULT false,
// state integer NOT NULL DEFAULT 0,
// connected_bus jsonb NOT NULL DEFAULT '{}'::jsonb,
// label jsonb NOT NULL DEFAULT '{}'::jsonb,
// context jsonb NOT NULL DEFAULT '{}'::jsonb,
// page_id integer NOT NULL,
// op integer NOT NULL DEFAULT '-1'::integer,
// ts timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
unix.Syscall(unix.SYS_SHMDT, shmAddr, 0, 0)
return nil
}
// DeleteShareMemory defines a function to delete shared memory
func DeleteShareMemory(key uintptr) error {
_, _, err := unix.Syscall(unix.SYS_SHM_UNLINK, key, 0, 0o666)
if err != 0 {
return fmt.Errorf("get shared memory failed:%w", err)
}
return nil
}

View File

@ -1,77 +0,0 @@
package task
import (
"github.com/gofrs/uuid"
)
// DefaultPriority is the default task priority
const DefaultPriority = 5
// HighPriority represents high priority tasks
const HighPriority = 10
// LowPriority represents low priority tasks
const LowPriority = 1
// TaskQueueMessage defines minimal message structure for RabbitMQ/Redis queue dispatch
// This struct is designed to be lightweight for efficient message transport
type TaskQueueMessage struct {
TaskID uuid.UUID `json:"task_id"`
TaskType TaskType `json:"task_type"`
Priority int `json:"priority,omitempty"` // Optional, defaults to DefaultPriority
}
// NewTaskQueueMessage creates a new TaskQueueMessage with default priority
func NewTaskQueueMessage(taskID uuid.UUID, taskType TaskType) *TaskQueueMessage {
return &TaskQueueMessage{
TaskID: taskID,
TaskType: taskType,
Priority: DefaultPriority,
}
}
// NewTaskQueueMessageWithPriority creates a new TaskQueueMessage with specified priority
func NewTaskQueueMessageWithPriority(taskID uuid.UUID, taskType TaskType, priority int) *TaskQueueMessage {
return &TaskQueueMessage{
TaskID: taskID,
TaskType: taskType,
Priority: priority,
}
}
// ToJSON converts the TaskQueueMessage to JSON bytes
func (m *TaskQueueMessage) ToJSON() ([]byte, error) {
return []byte{}, nil // Placeholder - actual implementation would use json.Marshal
}
// Validate checks if the TaskQueueMessage is valid
func (m *TaskQueueMessage) Validate() bool {
// Check if TaskID is valid (not nil UUID)
if m.TaskID == uuid.Nil {
return false
}
// Check if TaskType is valid
switch m.TaskType {
case TypeTopologyAnalysis, TypeEventAnalysis, TypeBatchImport:
return true
default:
return false
}
}
// SetPriority sets the priority of the task queue message with validation
func (m *TaskQueueMessage) SetPriority(priority int) {
if priority < LowPriority {
priority = LowPriority
}
if priority > HighPriority {
priority = HighPriority
}
m.Priority = priority
}
// GetPriority returns the priority of the task queue message
func (m *TaskQueueMessage) GetPriority() int {
return m.Priority
}

View File

@ -1,55 +0,0 @@
package task
import (
"time"
)
type TaskStatus string
const (
StatusPending TaskStatus = "PENDING"
StatusRunning TaskStatus = "RUNNING"
StatusCompleted TaskStatus = "COMPLETED"
StatusFailed TaskStatus = "FAILED"
)
// TaskType 定义异步任务的具体业务类型
type TaskType string
const (
TypeTopologyAnalysis TaskType = "TOPOLOGY_ANALYSIS"
TypeEventAnalysis TaskType = "EVENT_ANALYSIS"
TypeBatchImport TaskType = "BATCH_IMPORT"
)
type Task struct {
ID string `bson:"_id" json:"id"`
Type TaskType `bson:"type" json:"type"`
Status TaskStatus `bson:"status" json:"status"`
Priority int `bson:"priority" json:"priority"`
Params map[string]interface{} `bson:"params" json:"params"`
Result map[string]interface{} `bson:"result,omitempty" json:"result"`
ErrorMsg string `bson:"error_msg,omitempty" json:"error_msg"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
StartedAt time.Time `bson:"started_at,omitempty" json:"started_at"`
CompletedAt time.Time `bson:"completed_at,omitempty" json:"completed_at"`
}
type TopologyParams struct {
CheckIsland bool `json:"check_island"`
CheckShort bool `json:"check_short"`
BaseModelIDs []string `json:"base_model_ids"`
}
type EventAnalysisParams struct {
MotorID string `json:"motor_id"`
TriggerID string `json:"trigger_id"`
DurationMS int `json:"duration_ms"`
}
type BatchImportParams struct {
FileName string `json:"file_name"`
FilePath string `json:"file_path"`
}

View File

@ -3,7 +3,6 @@ package util
import (
"context"
"errors"
"fmt"
"time"
@ -14,36 +13,25 @@ import (
)
// NewRedisClient define func of initialize the Redis client
func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) {
// default options
configs := &clientConfig{
Options: &redis.Options{
options := RedisOptions{
redisOptions: &redis.Options{
Addr: addr,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolSize: 10,
},
}
// Apply configuration options from config
var errs []error
for _, opt := range opts {
if err := opt(configs); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return nil, fmt.Errorf("failed to apply options: %w", errors.Join(errs...))
opt(&options)
}
// create redis client
client := redis.NewClient(configs.Options)
client := redis.NewClient(options.redisOptions)
if configs.DialTimeout > 0 {
if options.timeout > 0 {
// check if the connection is successful
ctx, cancel := context.WithTimeout(context.Background(), configs.DialTimeout)
ctx, cancel := context.WithTimeout(context.Background(), options.timeout)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("can not connect redis:%v", err)
@ -55,29 +43,22 @@ func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
// NewRedigoPool define func of initialize the Redigo pool
func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
pool := &redigo.Pool{
// TODO optimize IdleTimeout with config parameter
MaxIdle: rCfg.PoolSize / 2,
MaxActive: rCfg.PoolSize,
// TODO optimize IdleTimeout with config parameter
IdleTimeout: 240 * time.Second,
TestOnBorrow: func(c redigo.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
// Dial function to create the connection
Dial: func() (redigo.Conn, error) {
dialTimeout := time.Duration(rCfg.DialTimeout) * time.Second
readTimeout := time.Duration(rCfg.ReadTimeout) * time.Second
writeTimeout := time.Duration(rCfg.WriteTimeout) * time.Second
timeout := time.Duration(rCfg.Timeout) * time.Millisecond // 假设 rCfg.Timeout 是毫秒
opts := []redigo.DialOption{
redigo.DialDatabase(rCfg.DB),
redigo.DialPassword(rCfg.Password),
redigo.DialConnectTimeout(dialTimeout),
redigo.DialReadTimeout(readTimeout),
redigo.DialWriteTimeout(writeTimeout),
redigo.DialConnectTimeout(timeout),
// redigo.DialReadTimeout(timeout),
// redigo.DialWriteTimeout(timeout),
// TODO add redigo.DialUsername when redis open acl
// redis.DialUsername("username"),
}
c, err := redigo.Dial("tcp", rCfg.Addr, opts...)
@ -91,14 +72,13 @@ func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
conn := pool.Get()
defer conn.Close()
if err := conn.Err(); err != nil {
return nil, fmt.Errorf("failed to get connection from pool: %w", err)
if conn.Err() != nil {
return nil, fmt.Errorf("failed to get connection from pool: %w", conn.Err())
}
_, err := conn.Do("PING")
if err != nil {
return nil, fmt.Errorf("redis connection test (PING) failed: %w", err)
}
return pool, nil
}

View File

@ -5,79 +5,56 @@ import (
"errors"
"time"
"modelRT/constants"
"github.com/redis/go-redis/v9"
)
type clientConfig struct {
*redis.Options
type RedisOptions struct {
redisOptions *redis.Options
timeout time.Duration
}
type Option func(*clientConfig) error
type RedisOption func(*RedisOptions) error
// WithPassword define func of configure redis password options
func WithPassword(password string, env string) Option {
return func(c *clientConfig) error {
if env == constants.ProductionDeployMode && password == "" {
func WithPassword(password string) RedisOption {
return func(o *RedisOptions) error {
if password == "" {
return errors.New("password is empty")
}
c.Password = password
o.redisOptions.Password = password
return nil
}
}
// WithConnectTimeout define func of configure redis connect timeout options
func WithConnectTimeout(timeout time.Duration) Option {
return func(c *clientConfig) error {
// WithTimeout define func of configure redis timeout options
func WithTimeout(timeout time.Duration) RedisOption {
return func(o *RedisOptions) error {
if timeout < 0 {
return errors.New("timeout can not be negative")
}
c.DialTimeout = timeout
return nil
}
}
// WithReadTimeout define func of configure redis read timeout options
func WithReadTimeout(timeout time.Duration) Option {
return func(c *clientConfig) error {
if timeout < 0 {
return errors.New("timeout can not be negative")
}
c.ReadTimeout = timeout
return nil
}
}
// WithWriteTimeout define func of configure redis write timeout options
func WithWriteTimeout(timeout time.Duration) Option {
return func(c *clientConfig) error {
if timeout < 0 {
return errors.New("timeout can not be negative")
}
c.WriteTimeout = timeout
o.timeout = timeout
return nil
}
}
// WithDB define func of configure redis db options
func WithDB(db int) Option {
return func(c *clientConfig) error {
func WithDB(db int) RedisOption {
return func(o *RedisOptions) error {
if db < 0 {
return errors.New("db can not be negative")
}
c.DB = db
o.redisOptions.DB = db
return nil
}
}
// WithPoolSize define func of configure pool size options
func WithPoolSize(poolSize int) Option {
return func(c *clientConfig) error {
func WithPoolSize(poolSize int) RedisOption {
return func(o *RedisOptions) error {
if poolSize <= 0 {
return errors.New("pool size must be greater than 0")
}
c.PoolSize = poolSize
o.redisOptions.PoolSize = poolSize
return nil
}
}