Compare commits

..

No commits in common. "feature/bay-realtime-data-calc" and "develop" have entirely different histories.

21 changed files with 301 additions and 324 deletions

View File

@ -16,12 +16,6 @@ var (
// ErrFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
// ErrSubTargetRepeat define variable to indicates subscription target already exist in list
ErrSubTargetRepeat = newError(40005, "subscription target already exist in list")
// ErrSubTargetNotFound define variable to indicates can not find measurement by subscription target
ErrSubTargetNotFound = newError(40006, "found measuremnet by subscription target failed")
// ErrCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
ErrCancelSubTargetMissing = newError(40007, "cancel a not exist subscription target")
// ErrDBQueryFailed define variable to represents a generic failure during a PostgreSQL SELECT or SCAN operation.
ErrDBQueryFailed = newError(50001, "query postgres database data failed")

View File

@ -44,11 +44,10 @@ var baseCurrentFunc = func(archorValue float64, args ...float64) float64 {
// SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float64, args ...float64) float64, []float64) {
if componentType == constants.DemoType {
switch anchorName {
case "voltage":
if anchorName == "voltage" {
resistance := componentData["resistance"].(float64)
return baseVoltageFunc, []float64{resistance}
case "current":
} else if anchorName == "current" {
resistance := componentData["resistance"].(float64)
return baseCurrentFunc, []float64{resistance}
}

View File

@ -19,7 +19,6 @@ type ServiceConfig struct {
ServiceAddr string `mapstructure:"service_addr"`
ServiceName string `mapstructure:"service_name"`
SecretKey string `mapstructure:"secret_key"`
DeployEnv string `mapstructure:"deploy_env"`
}
// KafkaConfig define config struct of kafka config
@ -54,13 +53,11 @@ type LoggerConfig struct {
// RedisConfig define config struct of redis config
type RedisConfig struct {
Addr string `mapstructure:"addr"`
Password string `mapstructure:"password"`
DB int `mapstructure:"db"`
PoolSize int `mapstructure:"poolsize"`
DialTimeout int `mapstructure:"dial_timeout"`
ReadTimeout int `mapstructure:"read_timeout"`
WriteTimeout int `mapstructure:"write_timeout"`
Addr string `mapstructure:"addr"`
Password string `mapstructure:"password"`
DB int `mapstructure:"db"`
PoolSize int `mapstructure:"poolsize"`
Timeout int `mapstructure:"timeout"`
}
// AntsConfig define config struct of ants pool config

View File

@ -0,0 +1,17 @@
// Package constants define constant variable
package constants
const (
// CodeSuccess define constant to indicates that the API was successfully processed
CodeSuccess = 20000
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
CodeInvalidParamFailed = 40001
// CodeDBQueryFailed define constant to indicates database query operation failed
CodeDBQueryFailed = 50001
// CodeDBUpdateailed define constant to indicates database update operation failed
CodeDBUpdateailed = 50002
// CodeRedisQueryFailed define constant to indicates redis query operation failed
CodeRedisQueryFailed = 60001
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
CodeRedisUpdateFailed = 60002
)

View File

@ -1,31 +0,0 @@
// Package constants define constant variable
package constants
const (
// CodeSuccess define constant to indicates that the API was successfully processed
CodeSuccess = 20000
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
CodeInvalidParamFailed = 40001
// CodeFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
CodeFoundTargetFailed = 40004
// CodeSubTargetRepeat define variable to indicates subscription target already exist in list
CodeSubTargetRepeat = 40005
// CodeSubTargetNotFound define variable to indicates can not find measurement by subscription target
CodeSubTargetNotFound = 40006
// CodeCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
CodeCancelSubTargetMissing = 40007
// CodeUpdateSubTargetMissing define variable to indicates update a not exist subscription target
CodeUpdateSubTargetMissing = 40008
// CodeAppendSubTargetMissing define variable to indicates append a not exist subscription target
CodeAppendSubTargetMissing = 40009
// CodeUnsupportSubOperation define variable to indicates append a not exist subscription target
CodeUnsupportSubOperation = 40010
// CodeDBQueryFailed define constant to indicates database query operation failed
CodeDBQueryFailed = 50001
// CodeDBUpdateailed define constant to indicates database update operation failed
CodeDBUpdateailed = 50002
// CodeRedisQueryFailed define constant to indicates redis query operation failed
CodeRedisQueryFailed = 60001
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
CodeRedisUpdateFailed = 60002
)

View File

@ -1,11 +0,0 @@
// Package constants define constant variable
package constants
const (
// DevelopmentDeployMode define development operator environment for modelRT project
DevelopmentDeployMode = "development"
// DebugDeployMode define debug operator environment for modelRT project
DebugDeployMode = "debug"
// ProductionDeployMode define production operator environment for modelRT project
ProductionDeployMode = "production"
)

View File

@ -12,6 +12,29 @@ const (
SubUpdateAction string = "update"
)
// 定义状态常量
// TODO 从4位格式修改为5位格式
const (
// SubSuccessCode define subscription success code
SubSuccessCode = "1001"
// SubFailedCode define subscription failed code
SubFailedCode = "1002"
// RTDSuccessCode define real time data return success code
RTDSuccessCode = "1003"
// RTDFailedCode define real time data return failed code
RTDFailedCode = "1004"
// CancelSubSuccessCode define cancel subscription success code
CancelSubSuccessCode = "1005"
// CancelSubFailedCode define cancel subscription failed code
CancelSubFailedCode = "1006"
// SubRepeatCode define subscription repeat code
SubRepeatCode = "1007"
// UpdateSubSuccessCode define update subscription success code
UpdateSubSuccessCode = "1008"
// UpdateSubFailedCode define update subscription failed code
UpdateSubFailedCode = "1009"
)
const (
// SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine
SysCtrlPrefix = "SYS_CTRL_"

View File

@ -53,8 +53,7 @@ func FillingLongTokenModel(ctx context.Context, tx *gorm.DB, identModel *model.L
func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken string) (model.IndentityTokenModelInterface, error) {
identSlice := strings.Split(identToken, ".")
identSliceLen := len(identSlice)
switch identSliceLen {
case 4:
if identSliceLen == 4 {
// token1.token2.token3.token4.token7
shortIndentModel := &model.ShortIdentityTokenModel{
GridTag: identSlice[0],
@ -68,7 +67,7 @@ func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken strin
return nil, err
}
return shortIndentModel, nil
case 7:
} else if identSliceLen == 7 {
// token1.token2.token3.token4.token5.token6.token7
longIndentModel := &model.LongIdentityTokenModel{
GridTag: identSlice[0],

View File

@ -19,8 +19,7 @@ func ParseAttrToken(ctx context.Context, tx *gorm.DB, attrToken, clientToken str
attrSlice := strings.Split(attrToken, ".")
attrLen := len(attrSlice)
switch attrLen {
case 4:
if attrLen == 4 {
short := &model.ShortAttrInfo{
AttrGroupName: attrSlice[2],
AttrKey: attrSlice[3],
@ -36,7 +35,7 @@ func ParseAttrToken(ctx context.Context, tx *gorm.DB, attrToken, clientToken str
}
short.AttrValue = attrValue
return short, nil
case 7:
} else if attrLen == 7 {
long := &model.LongAttrInfo{
AttrGroupName: attrSlice[5],
AttrKey: attrSlice[6],

View File

@ -2,7 +2,9 @@
package database
import (
"context"
"sync"
"time"
"modelRT/logger"
@ -25,15 +27,17 @@ func GetPostgresDBClient() *gorm.DB {
}
// InitPostgresDBInstance return instance of PostgresDB client
func InitPostgresDBInstance(PostgresDBURI string) *gorm.DB {
func InitPostgresDBInstance(ctx context.Context, PostgresDBURI string) *gorm.DB {
postgresOnce.Do(func() {
_globalPostgresClient = initPostgresDBClient(PostgresDBURI)
_globalPostgresClient = initPostgresDBClient(ctx, PostgresDBURI)
})
return _globalPostgresClient
}
// initPostgresDBClient return successfully initialized PostgresDB client
func initPostgresDBClient(PostgresDBURI string) *gorm.DB {
func initPostgresDBClient(ctx context.Context, PostgresDBURI string) *gorm.DB {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
db, err := gorm.Open(postgres.Open(PostgresDBURI), &gorm.Config{Logger: logger.NewGormLogger()})
if err != nil {
panic(err)

View File

@ -19,8 +19,8 @@ func NewRedisClient() *RedisClient {
}
}
// QueryByZRange define func to query real time data from redis zset
func (rc *RedisClient) QueryByZRange(ctx context.Context, key string, size int64) ([]redis.Z, error) {
// QueryByZRangeByLex define func to query real time data from redis zset
func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64) ([]redis.Z, error) {
client := rc.Client
args := redis.ZRangeArgs{
Key: key,

View File

@ -16,15 +16,13 @@ var (
)
// initClient define func of return successfully initialized redis client
func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
func initClient(rCfg config.RedisConfig) *redis.Client {
client, err := util.NewRedisClient(
rCfg.Addr,
util.WithPassword(rCfg.Password, deployEnv),
util.WithPassword(rCfg.Password),
util.WithDB(rCfg.DB),
util.WithPoolSize(rCfg.PoolSize),
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
)
if err != nil {
panic(err)
@ -33,9 +31,9 @@ func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
}
// InitRedisClientInstance define func of return instance of redis client
func InitRedisClientInstance(rCfg config.RedisConfig, deployEnv string) *redis.Client {
func InitRedisClientInstance(rCfg config.RedisConfig) *redis.Client {
once.Do(func() {
_globalStorageClient = initClient(rCfg, deployEnv)
_globalStorageClient = initClient(rCfg)
})
return _globalStorageClient
}

View File

@ -16,15 +16,13 @@ var (
)
// initClient define func of return successfully initialized redis client
func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
func initClient(rCfg config.RedisConfig) *redis.Client {
client, err := util.NewRedisClient(
rCfg.Addr,
util.WithPassword(rCfg.Password, deployEnv),
util.WithPassword(rCfg.Password),
util.WithDB(rCfg.DB),
util.WithPoolSize(rCfg.PoolSize),
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
)
if err != nil {
panic(err)
@ -33,9 +31,9 @@ func initClient(rCfg config.RedisConfig, deployEnv string) *redis.Client {
}
// InitClientInstance define func of return instance of redis client
func InitClientInstance(rCfg config.RedisConfig, deployEnv string) *redis.Client {
func InitClientInstance(rCfg config.RedisConfig) *redis.Client {
once.Do(func() {
_globalLockerClient = initClient(rCfg, deployEnv)
_globalLockerClient = initClient(rCfg)
})
return _globalLockerClient
}

View File

@ -30,14 +30,3 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) {
}
c.JSON(http.StatusOK, resp)
}
func renderWSRespFailure(c *gin.Context, code int, msg string, payload any) {
resp := network.WSResponse{
Code: code,
Msg: msg,
}
if payload != nil {
resp.Payload = payload
}
c.JSON(http.StatusOK, resp)
}

View File

@ -39,14 +39,20 @@ func PullRealTimeDataHandler(c *gin.Context) {
if clientID == "" {
err := fmt.Errorf("clientID is missing from the path")
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
renderWSRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error(c, "upgrade http protocol to websocket protocol failed", "error", err)
renderWSRespFailure(c, constants.RespCodeServerError, err.Error(), nil)
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
defer conn.Close()
@ -54,18 +60,9 @@ func PullRealTimeDataHandler(c *gin.Context) {
ctx, cancel := context.WithCancel(c.Request.Context())
defer cancel()
conn.SetCloseHandler(func(code int, text string) error {
logger.Info(c.Request.Context(), "websocket processor shutdown trigger",
"clientID", clientID, "code", code, "reason", text)
// call cancel to notify other goroutines to stop working
cancel()
return nil
})
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
sendChan := make(chan network.WSResponse, constants.SendChanBufferSize)
sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize)
go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan)
go readClientMessages(ctx, conn, clientID, cancel)
@ -82,33 +79,52 @@ func PullRealTimeDataHandler(c *gin.Context) {
select {
case targetData, ok := <-fanInChan:
if !ok {
sendChan <- network.WSResponse{
Code: constants.RespCodeServerError,
Msg: "abnormal shutdown of data fan-in channel",
}
logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID)
return
}
buffer = append(buffer, targetData)
if len(buffer) >= bufferMaxSize {
flushBuffer(ctx, &buffer, sendChan, clientID, "buffer_full")
// buffer is full, send immediately
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID)
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
// reset the ticker to prevent it from triggering immediately after the ticker is sent
ticker.Reset(sendMaxInterval)
}
case <-ticker.C:
if len(buffer) > 0 {
flushBuffer(ctx, &buffer, sendChan, clientID, "ticker_timeout")
// when the ticker is triggered, all data in the send buffer is sent
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID)
}
// reset buffer
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
}
case <-ctx.Done():
// last refresh before exiting
// send the last remaining data
if len(buffer) > 0 {
flushBuffer(ctx, &buffer, sendChan, clientID, "shutdown")
select {
case sendChan <- buffer:
default:
logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID)
}
}
logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
return
}
}
}
// readClientMessages define func to responsible for continuously listening for messages sent by clients (such as Ping/Pong, Close Frame, or control commands)
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
// conn.SetReadLimit(512)
for {
@ -133,47 +149,54 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri
}
}
func flushBuffer(ctx context.Context, buffer *[]network.RealTimePullTarget, sendChan chan<- network.WSResponse, clientID string, reason string) {
if len(*buffer) == 0 {
return
// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client
func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error {
if len(targetsData) == 0 {
return nil
}
resp := network.WSResponse{
Code: constants.RespCodeSuccess,
Msg: "process completed",
response := network.SuccessResponse{
Code: 200,
Msg: "success",
Payload: network.RealTimePullPayload{
Targets: *buffer,
Targets: targetsData,
},
}
select {
case sendChan <- resp:
default:
logger.Warn(ctx, "sendChan blocked, dropping data batch", "client_id", clientID, "reason", reason)
}
*buffer = make([]network.RealTimePullTarget, 0, constants.SendMaxBatchSize)
return conn.WriteJSON(response)
}
// sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan network.WSResponse, cancel context.CancelFunc) {
defer func() {
if r := recover(); r != nil {
logger.Error(ctx, "sendDataStream recovered from panic", "err", r)
}
}()
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
for resp := range sendChan {
if err := conn.WriteJSON(resp); err != nil {
logger.Error(ctx, "websocket write failed", "client_id", clientID, "error", err)
for targetsData := range sendChan {
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved {
err := conn.WriteJSON(map[string]any{
"code": 2101,
"msg": "all targets removed in given client_id",
"payload": map[string]int{
"active_targets_count": 0,
},
})
if err != nil {
logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err)
cancel()
}
continue
}
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil {
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err)
cancel()
return
}
}
logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID)
}
// processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- network.WSResponse) {
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) {
// ensure the fanInChan will not leak
defer close(fanInChan)
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
stopChanMap := make(map[string]chan struct{})
s.globalMutex.RLock()
@ -360,7 +383,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
}
// removeTargets define func to stops running polling goroutines for targets that were removed
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- network.WSResponse) {
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) {
for _, target := range removeTargets {
stopChan, exists := stopChanMap[target]
if !exists {
@ -379,18 +402,17 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re
}
}
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- network.WSResponse) {
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
resp := network.WSResponse{
Code: constants.RespCodeSuccessWithNoSub,
Msg: "all targets removed",
Payload: map[string]int{"active_targets_count": 0},
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) {
specialTarget := network.RealTimePullTarget{
ID: constants.SysCtrlAllRemoved,
Datas: []network.RealTimePullData{},
}
select {
case sendChan <- resp:
case sendChan <- []network.RealTimePullTarget{specialTarget}:
logger.Info(ctx, "sent 2101 status request to sendChan")
default:
logger.Warn(ctx, "sendChan is full, skipping 2101 status")
logger.Warn(ctx, "sendChan is full, skipping 2101 status message")
}
}
@ -441,7 +463,7 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig,
}
func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) {
members, err := client.QueryByZRange(ctx, config.queryKey, config.dataSize)
members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize)
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
return

View File

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"maps"
"net/http"
"sync"
"modelRT/constants"
@ -32,42 +33,42 @@ func init() {
// @Accept json
// @Produce json
// @Param request body network.RealTimeSubRequest true "量测节点实时数据订阅"
// @Success 2000 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
//
// @Example 2000 {
// "code": 2000,
// "msg": "process completed",
// @Example 200 {
// "code": 200,
// "msg": "success",
// "payload": {
// "targets": [
// {
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_C_rms",
// "code": "20000",
// "code": "1001",
// "msg": "subscription success"
// },
// {
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
// "code": "20000",
// "code": "1002",
// "msg": "subscription failed"
// }
// ]
// }
// }
//
// @Failure 3000 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
//
// @Example 3000 {
// "code": 3000,
// "msg": "process completed with partial failures",
// @Example 400 {
// "code": 400,
// "msg": "failed to get recommend data from redis",
// "payload": {
// "targets": [
// {
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_A_rms",
// "code": "40005",
// "code": "1002",
// "msg": "subscription failed"
// },
// {
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
// "code": "50001",
// "code": "1002",
// "msg": "subscription failed"
// }
// ]
@ -82,7 +83,10 @@ func RealTimeSubHandler(c *gin.Context) {
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(c, "failed to unmarshal real time query request", "error", err)
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
@ -91,7 +95,10 @@ func RealTimeSubHandler(c *gin.Context) {
id, err := uuid.NewV4()
if err != nil {
logger.Error(c, "failed to generate client id", "error", err)
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
clientID = id.String()
@ -116,74 +123,110 @@ func RealTimeSubHandler(c *gin.Context) {
results, err := globalSubState.CreateConfig(c, tx, clientID, request.Measurements)
if err != nil {
logger.Error(c, "create real time data subscription config failed", "error", err)
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
case constants.SubStopAction:
results, err := globalSubState.RemoveTargets(c, clientID, request.Measurements)
if err != nil {
logger.Error(c, "remove target to real time data subscription config failed", "error", err)
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
case constants.SubAppendAction:
results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements)
if err != nil {
logger.Error(c, "append target to real time data subscription config failed", "error", err)
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
case constants.SubUpdateAction:
results, err := globalSubState.UpdateTargets(c, tx, clientID, request.Measurements)
if err != nil {
logger.Error(c, "update target to real time data subscription config failed", "error", err)
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
default:
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action)
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, constants.CodeUnsupportSubOperation, err)
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: network.RealTimeSubPayload{
ClientID: clientID,
TargetResults: results,
},
})
return
}
@ -240,12 +283,12 @@ func processAndValidateTargetsForStart(ctx context.Context, tx *gorm.DB, measure
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.CodeFoundTargetFailed
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.CodeSuccess
targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg
targetProcessResults = append(targetProcessResults, targetResult)
successfulTargets = append(successfulTargets, target)
@ -284,7 +327,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
if _, exist := config.targetContext[target]; !exist {
err := fmt.Errorf("target %s does not exists in subscription list", target)
logger.Error(ctx, "update target does not exist in subscription list", "error", err, "target", target)
targetResult.Code = constants.CodeUpdateSubTargetMissing
targetResult.Code = constants.UpdateSubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
@ -293,13 +336,13 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.CodeDBQueryFailed
targetResult.Code = constants.UpdateSubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.CodeSuccess
targetResult.Code = constants.UpdateSubSuccessCode
targetResult.Msg = constants.UpdateSubSuccessMsg
targetProcessResults = append(targetProcessResults, targetResult)
successfulTargets = append(successfulTargets, target)
@ -430,7 +473,7 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI
if !exist {
err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID)
logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err)
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeAppendSubTargetMissing, err), err
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
}
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount)
@ -464,7 +507,7 @@ func filterAndDeduplicateRepeatTargets(resultsSlice []network.TargetResult, idsS
for index := range resultsSlice {
if _, isTarget := set[resultsSlice[index].ID]; isTarget {
resultsSlice[index].Code = constants.CodeSubTargetRepeat
resultsSlice[index].Code = constants.SubRepeatCode
resultsSlice[index].Msg = constants.SubRepeatMsg
}
}
@ -532,7 +575,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
s.globalMutex.RUnlock()
err := fmt.Errorf("clientID %s not found", clientID)
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeCancelSubTargetMissing, err), err
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
}
s.globalMutex.RUnlock()
@ -552,7 +595,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
for _, target := range measTargets {
targetResult := network.TargetResult{
ID: target,
Code: constants.CodeCancelSubTargetMissing,
Code: constants.CancelSubFailedCode,
Msg: constants.CancelSubFailedMsg,
}
targetProcessResults = append(targetProcessResults, targetResult)
@ -573,7 +616,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
transportTargets.Targets = append(transportTargets.Targets, existingTarget)
targetResult := network.TargetResult{
ID: existingTarget,
Code: constants.CodeSuccess,
Code: constants.CancelSubSuccessCode,
Msg: constants.CancelSubSuccessMsg,
}
targetProcessResults = append(targetProcessResults, targetResult)
@ -596,7 +639,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
for target := range targetsToRemoveMap {
targetResult := network.TargetResult{
ID: target,
Code: constants.CodeCancelSubTargetMissing,
Code: constants.CancelSubFailedCode,
Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()),
}
targetProcessResults = append(targetProcessResults, targetResult)
@ -627,9 +670,10 @@ func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientI
s.globalMutex.RUnlock()
if !exist {
s.globalMutex.RUnlock()
err := fmt.Errorf("clientID %s not found", clientID)
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeUpdateSubTargetMissing, err), err
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
}
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForUpdate(ctx, tx, config, measurements, requestTargetsCount)
@ -678,13 +722,13 @@ func processRealTimeRequestCount(measurements []network.RealTimeMeasurementItem)
return totalTargetsCount
}
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, businessCode int, err error) []network.TargetResult {
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult {
targetProcessResults := make([]network.TargetResult, 0, targetCount)
for _, measurementItem := range measurements {
for _, target := range measurementItem.Targets {
var targetResult network.TargetResult
targetResult.ID = target
targetResult.Code = businessCode
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
}

21
main.go
View File

@ -15,7 +15,6 @@ import (
"modelRT/alert"
"modelRT/config"
"modelRT/constants"
"modelRT/database"
"modelRT/diagram"
"modelRT/logger"
@ -99,14 +98,14 @@ func main() {
panic(err)
}
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceName, modelRTConfig.SecretKey)
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceConfig.ServiceName, modelRTConfig.ServiceConfig.SecretKey)
if err != nil {
logger.Error(ctx, "generate client token failed", "error", err)
panic(err)
}
// init postgresDBClient
postgresDBClient = database.InitPostgresDBInstance(modelRTConfig.PostgresDBURI)
postgresDBClient = database.InitPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI)
defer func() {
sqlDB, err := postgresDBClient.DB()
@ -128,17 +127,13 @@ func main() {
defer parsePool.Release()
searchPool, err := util.NewRedigoPool(modelRTConfig.StorageRedisConfig)
if err != nil {
logger.Error(ctx, "init redigo pool failed", "error", err)
panic(err)
}
defer searchPool.Close()
model.InitAutocompleterWithPool(searchPool)
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig, modelRTConfig.DeployEnv)
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig)
defer storageClient.Close()
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig, modelRTConfig.DeployEnv)
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig)
defer lockerClient.Close()
// init anchor param ants pool
@ -209,10 +204,8 @@ func main() {
return nil
})
// use release mode in production
if modelRTConfig.DeployEnv == constants.ProductionDeployMode {
gin.SetMode(gin.ReleaseMode)
}
// use release mode in productio
// gin.SetMode(gin.ReleaseMode)
engine := gin.New()
router.RegisterRoutes(engine, serviceToken)
@ -230,7 +223,7 @@ func main() {
// }
server := http.Server{
Addr: modelRTConfig.ServiceAddr,
Addr: modelRTConfig.ServiceConfig.ServiceAddr,
Handler: engine,
}

View File

@ -3,25 +3,18 @@ package network
// FailureResponse define struct of standard failure API response format
type FailureResponse struct {
Code int `json:"code" example:"3000"`
Msg string `json:"msg" example:"process completed with partial failures"`
Code int `json:"code" example:"500"`
Msg string `json:"msg" example:"failed to get recommend data from redis"`
Payload any `json:"payload" swaggertype:"object"`
}
// SuccessResponse define struct of standard successful API response format
type SuccessResponse struct {
Code int `json:"code" example:"2000"`
Msg string `json:"msg" example:"process completed"`
Code int `json:"code" example:"200"`
Msg string `json:"msg" example:"success"`
Payload any `json:"payload" swaggertype:"object"`
}
// WSResponse define struct of standard websocket API response format
type WSResponse struct {
Code int `json:"code" example:"2000"`
Msg string `json:"msg" example:"process completed"`
Payload any `json:"payload,omitempty" swaggertype:"object"`
}
// MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response.
type MeasurementRecommendPayload struct {
Input string `json:"input" example:"transformfeeder1_220."`
@ -33,7 +26,7 @@ type MeasurementRecommendPayload struct {
// TargetResult define struct of target item in real time data subscription response payload
type TargetResult struct {
ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"`
Code int `json:"code" example:"20000"`
Code string `json:"code" example:"1001"`
Msg string `json:"msg" example:"subscription success"`
}

View File

@ -205,20 +205,13 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) {
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
return
case <-ticker.C:
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize)
cancel()
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize)
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
continue
}
realTimedatas := util.ConvertZSetMembersToFloat64(members)
if len(realTimedatas) == 0 {
logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
continue
}
if conf.Analyzer != nil {
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
} else {

View File

@ -3,7 +3,6 @@ package util
import (
"context"
"errors"
"fmt"
"time"
@ -14,36 +13,25 @@ import (
)
// NewRedisClient define func of initialize the Redis client
func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) {
// default options
configs := &clientConfig{
Options: &redis.Options{
Addr: addr,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolSize: 10,
options := RedisOptions{
redisOptions: &redis.Options{
Addr: addr,
},
}
// Apply configuration options from config
var errs []error
for _, opt := range opts {
if err := opt(configs); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return nil, fmt.Errorf("failed to apply options: %w", errors.Join(errs...))
opt(&options)
}
// create redis client
client := redis.NewClient(configs.Options)
client := redis.NewClient(options.redisOptions)
if configs.DialTimeout > 0 {
if options.timeout > 0 {
// check if the connection is successful
ctx, cancel := context.WithTimeout(context.Background(), configs.DialTimeout)
ctx, cancel := context.WithTimeout(context.Background(), options.timeout)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("can not connect redis:%v", err)
@ -55,29 +43,22 @@ func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
// NewRedigoPool define func of initialize the Redigo pool
func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
pool := &redigo.Pool{
MaxIdle: rCfg.PoolSize / 2,
MaxActive: rCfg.PoolSize,
// TODO optimize IdleTimeout with config parameter
MaxIdle: rCfg.PoolSize / 2,
MaxActive: rCfg.PoolSize,
IdleTimeout: 240 * time.Second,
TestOnBorrow: func(c redigo.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
// Dial function to create the connection
Dial: func() (redigo.Conn, error) {
dialTimeout := time.Duration(rCfg.DialTimeout) * time.Second
readTimeout := time.Duration(rCfg.ReadTimeout) * time.Second
writeTimeout := time.Duration(rCfg.WriteTimeout) * time.Second
timeout := time.Duration(rCfg.Timeout) * time.Millisecond // 假设 rCfg.Timeout 是毫秒
opts := []redigo.DialOption{
redigo.DialDatabase(rCfg.DB),
redigo.DialPassword(rCfg.Password),
redigo.DialConnectTimeout(dialTimeout),
redigo.DialReadTimeout(readTimeout),
redigo.DialWriteTimeout(writeTimeout),
redigo.DialConnectTimeout(timeout),
// redigo.DialReadTimeout(timeout),
// redigo.DialWriteTimeout(timeout),
// TODO add redigo.DialUsername when redis open acl
// redis.DialUsername("username"),
}
c, err := redigo.Dial("tcp", rCfg.Addr, opts...)
@ -91,14 +72,13 @@ func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
conn := pool.Get()
defer conn.Close()
if err := conn.Err(); err != nil {
return nil, fmt.Errorf("failed to get connection from pool: %w", err)
if conn.Err() != nil {
return nil, fmt.Errorf("failed to get connection from pool: %w", conn.Err())
}
_, err := conn.Do("PING")
if err != nil {
return nil, fmt.Errorf("redis connection test (PING) failed: %w", err)
}
return pool, nil
}

View File

@ -5,79 +5,56 @@ import (
"errors"
"time"
"modelRT/constants"
"github.com/redis/go-redis/v9"
)
type clientConfig struct {
*redis.Options
type RedisOptions struct {
redisOptions *redis.Options
timeout time.Duration
}
type Option func(*clientConfig) error
type RedisOption func(*RedisOptions) error
// WithPassword define func of configure redis password options
func WithPassword(password string, env string) Option {
return func(c *clientConfig) error {
if env == constants.ProductionDeployMode && password == "" {
func WithPassword(password string) RedisOption {
return func(o *RedisOptions) error {
if password == "" {
return errors.New("password is empty")
}
c.Password = password
o.redisOptions.Password = password
return nil
}
}
// WithConnectTimeout define func of configure redis connect timeout options
func WithConnectTimeout(timeout time.Duration) Option {
return func(c *clientConfig) error {
// WithTimeout define func of configure redis timeout options
func WithTimeout(timeout time.Duration) RedisOption {
return func(o *RedisOptions) error {
if timeout < 0 {
return errors.New("timeout can not be negative")
}
c.DialTimeout = timeout
return nil
}
}
// WithReadTimeout define func of configure redis read timeout options
func WithReadTimeout(timeout time.Duration) Option {
return func(c *clientConfig) error {
if timeout < 0 {
return errors.New("timeout can not be negative")
}
c.ReadTimeout = timeout
return nil
}
}
// WithWriteTimeout define func of configure redis write timeout options
func WithWriteTimeout(timeout time.Duration) Option {
return func(c *clientConfig) error {
if timeout < 0 {
return errors.New("timeout can not be negative")
}
c.WriteTimeout = timeout
o.timeout = timeout
return nil
}
}
// WithDB define func of configure redis db options
func WithDB(db int) Option {
return func(c *clientConfig) error {
func WithDB(db int) RedisOption {
return func(o *RedisOptions) error {
if db < 0 {
return errors.New("db can not be negative")
}
c.DB = db
o.redisOptions.DB = db
return nil
}
}
// WithPoolSize define func of configure pool size options
func WithPoolSize(poolSize int) Option {
return func(c *clientConfig) error {
func WithPoolSize(poolSize int) RedisOption {
return func(o *RedisOptions) error {
if poolSize <= 0 {
return errors.New("pool size must be greater than 0")
}
c.PoolSize = poolSize
o.redisOptions.PoolSize = poolSize
return nil
}
}