Compare commits
4 Commits
develop
...
feature/ba
| Author | SHA1 | Date |
|---|---|---|
|
|
3ff29cc072 | |
|
|
617d21500e | |
|
|
1a1727adab | |
|
|
fd2b202037 |
|
|
@ -16,6 +16,12 @@ var (
|
|||
|
||||
// ErrFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
|
||||
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
|
||||
// ErrSubTargetRepeat define variable to indicates subscription target already exist in list
|
||||
ErrSubTargetRepeat = newError(40005, "subscription target already exist in list")
|
||||
// ErrSubTargetNotFound define variable to indicates can not find measurement by subscription target
|
||||
ErrSubTargetNotFound = newError(40006, "found measuremnet by subscription target failed")
|
||||
// ErrCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
|
||||
ErrCancelSubTargetMissing = newError(40007, "cancel a not exist subscription target")
|
||||
|
||||
// ErrDBQueryFailed define variable to represents a generic failure during a PostgreSQL SELECT or SCAN operation.
|
||||
ErrDBQueryFailed = newError(50001, "query postgres database data failed")
|
||||
|
|
|
|||
|
|
@ -53,11 +53,13 @@ type LoggerConfig struct {
|
|||
|
||||
// RedisConfig define config struct of redis config
|
||||
type RedisConfig struct {
|
||||
Addr string `mapstructure:"addr"`
|
||||
Password string `mapstructure:"password"`
|
||||
DB int `mapstructure:"db"`
|
||||
PoolSize int `mapstructure:"poolsize"`
|
||||
Timeout int `mapstructure:"timeout"`
|
||||
Addr string `mapstructure:"addr"`
|
||||
Password string `mapstructure:"password"`
|
||||
DB int `mapstructure:"db"`
|
||||
PoolSize int `mapstructure:"poolsize"`
|
||||
DialTimeout int `mapstructure:"dial_timeout"`
|
||||
ReadTimeout int `mapstructure:"read_timeout"`
|
||||
WriteTimeout int `mapstructure:"write_timeout"`
|
||||
}
|
||||
|
||||
// AntsConfig define config struct of ants pool config
|
||||
|
|
|
|||
|
|
@ -1,17 +0,0 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// CodeSuccess define constant to indicates that the API was successfully processed
|
||||
CodeSuccess = 20000
|
||||
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
|
||||
CodeInvalidParamFailed = 40001
|
||||
// CodeDBQueryFailed define constant to indicates database query operation failed
|
||||
CodeDBQueryFailed = 50001
|
||||
// CodeDBUpdateailed define constant to indicates database update operation failed
|
||||
CodeDBUpdateailed = 50002
|
||||
// CodeRedisQueryFailed define constant to indicates redis query operation failed
|
||||
CodeRedisQueryFailed = 60001
|
||||
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
|
||||
CodeRedisUpdateFailed = 60002
|
||||
)
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// CodeSuccess define constant to indicates that the API was successfully processed
|
||||
CodeSuccess = 20000
|
||||
// CodeInvalidParamFailed define constant to indicates request parameter parsing failed
|
||||
CodeInvalidParamFailed = 40001
|
||||
// CodeFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
|
||||
CodeFoundTargetFailed = 40004
|
||||
// CodeSubTargetRepeat define variable to indicates subscription target already exist in list
|
||||
CodeSubTargetRepeat = 40005
|
||||
// CodeSubTargetNotFound define variable to indicates can not find measurement by subscription target
|
||||
CodeSubTargetNotFound = 40006
|
||||
// CodeCancelSubTargetMissing define variable to indicates cancel a not exist subscription target
|
||||
CodeCancelSubTargetMissing = 40007
|
||||
// CodeUpdateSubTargetMissing define variable to indicates update a not exist subscription target
|
||||
CodeUpdateSubTargetMissing = 40008
|
||||
// CodeAppendSubTargetMissing define variable to indicates append a not exist subscription target
|
||||
CodeAppendSubTargetMissing = 40009
|
||||
// CodeUnsupportSubOperation define variable to indicates append a not exist subscription target
|
||||
CodeUnsupportSubOperation = 40010
|
||||
// CodeDBQueryFailed define constant to indicates database query operation failed
|
||||
CodeDBQueryFailed = 50001
|
||||
// CodeDBUpdateailed define constant to indicates database update operation failed
|
||||
CodeDBUpdateailed = 50002
|
||||
// CodeRedisQueryFailed define constant to indicates redis query operation failed
|
||||
CodeRedisQueryFailed = 60001
|
||||
// CodeRedisUpdateFailed define constant to indicates redis update operation failed
|
||||
CodeRedisUpdateFailed = 60002
|
||||
)
|
||||
|
|
@ -12,29 +12,6 @@ const (
|
|||
SubUpdateAction string = "update"
|
||||
)
|
||||
|
||||
// 定义状态常量
|
||||
// TODO 从4位格式修改为5位格式
|
||||
const (
|
||||
// SubSuccessCode define subscription success code
|
||||
SubSuccessCode = "1001"
|
||||
// SubFailedCode define subscription failed code
|
||||
SubFailedCode = "1002"
|
||||
// RTDSuccessCode define real time data return success code
|
||||
RTDSuccessCode = "1003"
|
||||
// RTDFailedCode define real time data return failed code
|
||||
RTDFailedCode = "1004"
|
||||
// CancelSubSuccessCode define cancel subscription success code
|
||||
CancelSubSuccessCode = "1005"
|
||||
// CancelSubFailedCode define cancel subscription failed code
|
||||
CancelSubFailedCode = "1006"
|
||||
// SubRepeatCode define subscription repeat code
|
||||
SubRepeatCode = "1007"
|
||||
// UpdateSubSuccessCode define update subscription success code
|
||||
UpdateSubSuccessCode = "1008"
|
||||
// UpdateSubFailedCode define update subscription failed code
|
||||
UpdateSubFailedCode = "1009"
|
||||
)
|
||||
|
||||
const (
|
||||
// SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine
|
||||
SysCtrlPrefix = "SYS_CTRL_"
|
||||
|
|
@ -19,8 +19,8 @@ func NewRedisClient() *RedisClient {
|
|||
}
|
||||
}
|
||||
|
||||
// QueryByZRangeByLex define func to query real time data from redis zset
|
||||
func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64) ([]redis.Z, error) {
|
||||
// QueryByZRange define func to query real time data from redis zset
|
||||
func (rc *RedisClient) QueryByZRange(ctx context.Context, key string, size int64) ([]redis.Z, error) {
|
||||
client := rc.Client
|
||||
args := redis.ZRangeArgs{
|
||||
Key: key,
|
||||
|
|
|
|||
|
|
@ -22,7 +22,9 @@ func initClient(rCfg config.RedisConfig) *redis.Client {
|
|||
util.WithPassword(rCfg.Password),
|
||||
util.WithDB(rCfg.DB),
|
||||
util.WithPoolSize(rCfg.PoolSize),
|
||||
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
|
||||
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
|
||||
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
|
||||
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
|||
|
|
@ -22,7 +22,9 @@ func initClient(rCfg config.RedisConfig) *redis.Client {
|
|||
util.WithPassword(rCfg.Password),
|
||||
util.WithDB(rCfg.DB),
|
||||
util.WithPoolSize(rCfg.PoolSize),
|
||||
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second),
|
||||
util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
|
||||
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
|
||||
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
|||
|
|
@ -30,3 +30,14 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) {
|
|||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func renderWSRespFailure(c *gin.Context, code int, msg string, payload any) {
|
||||
resp := network.WSResponse{
|
||||
Code: code,
|
||||
Msg: msg,
|
||||
}
|
||||
if payload != nil {
|
||||
resp.Payload = payload
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,20 +39,14 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
|||
if clientID == "" {
|
||||
err := fmt.Errorf("clientID is missing from the path")
|
||||
logger.Error(c, "query clientID from path failed", "error", err, "url", c.Request.RequestURI)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
renderWSRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := pullUpgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
logger.Error(c, "upgrade http protocol to websocket protocol failed", "error", err)
|
||||
renderWSRespFailure(c, constants.RespCodeServerError, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
|
@ -60,9 +54,18 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
|||
ctx, cancel := context.WithCancel(c.Request.Context())
|
||||
defer cancel()
|
||||
|
||||
conn.SetCloseHandler(func(code int, text string) error {
|
||||
logger.Info(c.Request.Context(), "websocket processor shutdown trigger",
|
||||
"clientID", clientID, "code", code, "reason", text)
|
||||
|
||||
// call cancel to notify other goroutines to stop working
|
||||
cancel()
|
||||
return nil
|
||||
})
|
||||
|
||||
// TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1
|
||||
fanInChan := make(chan network.RealTimePullTarget, constants.FanInChanMaxSize)
|
||||
sendChan := make(chan []network.RealTimePullTarget, constants.SendChanBufferSize)
|
||||
sendChan := make(chan network.WSResponse, constants.SendChanBufferSize)
|
||||
|
||||
go processTargetPolling(ctx, globalSubState, clientID, fanInChan, sendChan)
|
||||
go readClientMessages(ctx, conn, clientID, cancel)
|
||||
|
|
@ -79,52 +82,33 @@ func PullRealTimeDataHandler(c *gin.Context) {
|
|||
select {
|
||||
case targetData, ok := <-fanInChan:
|
||||
if !ok {
|
||||
logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID)
|
||||
sendChan <- network.WSResponse{
|
||||
Code: constants.RespCodeServerError,
|
||||
Msg: "abnormal shutdown of data fan-in channel",
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
buffer = append(buffer, targetData)
|
||||
|
||||
if len(buffer) >= bufferMaxSize {
|
||||
// buffer is full, send immediately
|
||||
select {
|
||||
case sendChan <- buffer:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (buffer is full)", "client_id", clientID)
|
||||
}
|
||||
|
||||
// reset buffer
|
||||
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
|
||||
// reset the ticker to prevent it from triggering immediately after the ticker is sent
|
||||
flushBuffer(ctx, &buffer, sendChan, clientID, "buffer_full")
|
||||
ticker.Reset(sendMaxInterval)
|
||||
}
|
||||
case <-ticker.C:
|
||||
if len(buffer) > 0 {
|
||||
// when the ticker is triggered, all data in the send buffer is sent
|
||||
select {
|
||||
case sendChan <- buffer:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan is full, dropping aggregated data batch (ticker is triggered)", "client_id", clientID)
|
||||
}
|
||||
|
||||
// reset buffer
|
||||
buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize)
|
||||
flushBuffer(ctx, &buffer, sendChan, clientID, "ticker_timeout")
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// send the last remaining data
|
||||
// last refresh before exiting
|
||||
if len(buffer) > 0 {
|
||||
select {
|
||||
case sendChan <- buffer:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan is full, cannot send last remaining data during shutdown.", "client_id", clientID)
|
||||
}
|
||||
flushBuffer(ctx, &buffer, sendChan, clientID, "shutdown")
|
||||
}
|
||||
logger.Info(ctx, "pullRealTimeDataHandler exiting as context is done.", "client_id", clientID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readClientMessages 负责持续监听客户端发送的消息(例如 Ping/Pong, Close Frame, 或控制命令)
|
||||
// readClientMessages define func to responsible for continuously listening for messages sent by clients (such as Ping/Pong, Close Frame, or control commands)
|
||||
func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID string, cancel context.CancelFunc) {
|
||||
// conn.SetReadLimit(512)
|
||||
for {
|
||||
|
|
@ -149,54 +133,47 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri
|
|||
}
|
||||
}
|
||||
|
||||
// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client
|
||||
func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error {
|
||||
if len(targetsData) == 0 {
|
||||
return nil
|
||||
func flushBuffer(ctx context.Context, buffer *[]network.RealTimePullTarget, sendChan chan<- network.WSResponse, clientID string, reason string) {
|
||||
if len(*buffer) == 0 {
|
||||
return
|
||||
}
|
||||
response := network.SuccessResponse{
|
||||
Code: 200,
|
||||
Msg: "success",
|
||||
|
||||
resp := network.WSResponse{
|
||||
Code: constants.RespCodeSuccess,
|
||||
Msg: "process completed",
|
||||
Payload: network.RealTimePullPayload{
|
||||
Targets: targetsData,
|
||||
Targets: *buffer,
|
||||
},
|
||||
}
|
||||
return conn.WriteJSON(response)
|
||||
|
||||
select {
|
||||
case sendChan <- resp:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan blocked, dropping data batch", "client_id", clientID, "reason", reason)
|
||||
}
|
||||
*buffer = make([]network.RealTimePullTarget, 0, constants.SendMaxBatchSize)
|
||||
}
|
||||
|
||||
// sendDataStream define func to manages a dedicated goroutine to push data batches or system signals to the websocket client
|
||||
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan []network.RealTimePullTarget, cancel context.CancelFunc) {
|
||||
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
|
||||
for targetsData := range sendChan {
|
||||
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
|
||||
if len(targetsData) == 1 && targetsData[0].ID == constants.SysCtrlAllRemoved {
|
||||
err := conn.WriteJSON(map[string]any{
|
||||
"code": 2101,
|
||||
"msg": "all targets removed in given client_id",
|
||||
"payload": map[string]int{
|
||||
"active_targets_count": 0,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(ctx, "send all targets removed system signal failed", "client_id", clientID, "error", err)
|
||||
cancel()
|
||||
}
|
||||
continue
|
||||
func sendDataStream(ctx context.Context, conn *websocket.Conn, clientID string, sendChan <-chan network.WSResponse, cancel context.CancelFunc) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logger.Error(ctx, "sendDataStream recovered from panic", "err", r)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := sendAggregateRealTimeDataStream(conn, targetsData); err != nil {
|
||||
logger.Error(ctx, "send the real time aggregate data failed in sender goroutine", "client_id", clientID, "error", err)
|
||||
logger.Info(ctx, "start dedicated websocket sender goroutine", "client_id", clientID)
|
||||
for resp := range sendChan {
|
||||
if err := conn.WriteJSON(resp); err != nil {
|
||||
logger.Error(ctx, "websocket write failed", "client_id", clientID, "error", err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
logger.Info(ctx, "sender goroutine exiting as channel is closed", "client_id", clientID)
|
||||
}
|
||||
|
||||
// processTargetPolling define func to process target in subscription map and data is continuously retrieved from redis based on the target
|
||||
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- []network.RealTimePullTarget) {
|
||||
// ensure the fanInChan will not leak
|
||||
defer close(fanInChan)
|
||||
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget, sendChan chan<- network.WSResponse) {
|
||||
logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID))
|
||||
stopChanMap := make(map[string]chan struct{})
|
||||
s.globalMutex.RLock()
|
||||
|
|
@ -383,7 +360,7 @@ func updateTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap m
|
|||
}
|
||||
|
||||
// removeTargets define func to stops running polling goroutines for targets that were removed
|
||||
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- []network.RealTimePullTarget) {
|
||||
func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, removeTargets []string, sendChan chan<- network.WSResponse) {
|
||||
for _, target := range removeTargets {
|
||||
stopChan, exists := stopChanMap[target]
|
||||
if !exists {
|
||||
|
|
@ -402,17 +379,18 @@ func removeTargets(ctx context.Context, stopChanMap map[string]chan struct{}, re
|
|||
}
|
||||
}
|
||||
|
||||
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- []network.RealTimePullTarget) {
|
||||
specialTarget := network.RealTimePullTarget{
|
||||
ID: constants.SysCtrlAllRemoved,
|
||||
Datas: []network.RealTimePullData{},
|
||||
func sendSpecialStatusToClient(ctx context.Context, sendChan chan<- network.WSResponse) {
|
||||
// TODO 使用 constants.SysCtrlPrefix + switch-case 形式应对可能的业务扩展
|
||||
resp := network.WSResponse{
|
||||
Code: constants.RespCodeSuccessWithNoSub,
|
||||
Msg: "all targets removed",
|
||||
Payload: map[string]int{"active_targets_count": 0},
|
||||
}
|
||||
|
||||
select {
|
||||
case sendChan <- []network.RealTimePullTarget{specialTarget}:
|
||||
logger.Info(ctx, "sent 2101 status request to sendChan")
|
||||
case sendChan <- resp:
|
||||
default:
|
||||
logger.Warn(ctx, "sendChan is full, skipping 2101 status message")
|
||||
logger.Warn(ctx, "sendChan is full, skipping 2101 status")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -463,7 +441,7 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig,
|
|||
}
|
||||
|
||||
func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) {
|
||||
members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize)
|
||||
members, err := client.QueryByZRange(ctx, config.queryKey, config.dataSize)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"modelRT/constants"
|
||||
|
|
@ -33,42 +32,42 @@ func init() {
|
|||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param request body network.RealTimeSubRequest true "量测节点实时数据订阅"
|
||||
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||
// @Success 2000 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||
//
|
||||
// @Example 200 {
|
||||
// "code": 200,
|
||||
// "msg": "success",
|
||||
// @Example 2000 {
|
||||
// "code": 2000,
|
||||
// "msg": "process completed",
|
||||
// "payload": {
|
||||
// "targets": [
|
||||
// {
|
||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_C_rms",
|
||||
// "code": "1001",
|
||||
// "code": "20000",
|
||||
// "msg": "subscription success"
|
||||
// },
|
||||
// {
|
||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
|
||||
// "code": "1002",
|
||||
// "code": "20000",
|
||||
// "msg": "subscription failed"
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||
// @Failure 3000 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表"
|
||||
//
|
||||
// @Example 400 {
|
||||
// "code": 400,
|
||||
// "msg": "failed to get recommend data from redis",
|
||||
// @Example 3000 {
|
||||
// "code": 3000,
|
||||
// "msg": "process completed with partial failures",
|
||||
// "payload": {
|
||||
// "targets": [
|
||||
// {
|
||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_A_rms",
|
||||
// "code": "1002",
|
||||
// "code": "40005",
|
||||
// "msg": "subscription failed"
|
||||
// },
|
||||
// {
|
||||
// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms",
|
||||
// "code": "1002",
|
||||
// "code": "50001",
|
||||
// "msg": "subscription failed"
|
||||
// }
|
||||
// ]
|
||||
|
|
@ -83,10 +82,7 @@ func RealTimeSubHandler(c *gin.Context) {
|
|||
|
||||
if err := c.ShouldBindJSON(&request); err != nil {
|
||||
logger.Error(c, "failed to unmarshal real time query request", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -95,10 +91,7 @@ func RealTimeSubHandler(c *gin.Context) {
|
|||
id, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
logger.Error(c, "failed to generate client id", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
clientID = id.String()
|
||||
|
|
@ -123,110 +116,74 @@ func RealTimeSubHandler(c *gin.Context) {
|
|||
results, err := globalSubState.CreateConfig(c, tx, clientID, request.Measurements)
|
||||
if err != nil {
|
||||
logger.Error(c, "create real time data subscription config failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
case constants.SubStopAction:
|
||||
results, err := globalSubState.RemoveTargets(c, clientID, request.Measurements)
|
||||
if err != nil {
|
||||
logger.Error(c, "remove target to real time data subscription config failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
case constants.SubAppendAction:
|
||||
results, err := globalSubState.AppendTargets(c, tx, clientID, request.Measurements)
|
||||
if err != nil {
|
||||
logger.Error(c, "append target to real time data subscription config failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
case constants.SubUpdateAction:
|
||||
results, err := globalSubState.UpdateTargets(c, tx, clientID, request.Measurements)
|
||||
if err != nil {
|
||||
logger.Error(c, "update target to real time data subscription config failed", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
renderRespFailure(c, constants.RespCodeFailed, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "success", network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
default:
|
||||
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action)
|
||||
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
||||
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
||||
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
},
|
||||
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, constants.CodeUnsupportSubOperation, err)
|
||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), network.RealTimeSubPayload{
|
||||
ClientID: clientID,
|
||||
TargetResults: results,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
|
@ -283,12 +240,12 @@ func processAndValidateTargetsForStart(ctx context.Context, tx *gorm.DB, measure
|
|||
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
|
||||
targetResult.Code = constants.SubFailedCode
|
||||
targetResult.Code = constants.CodeFoundTargetFailed
|
||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
continue
|
||||
}
|
||||
targetResult.Code = constants.SubSuccessCode
|
||||
targetResult.Code = constants.CodeSuccess
|
||||
targetResult.Msg = constants.SubSuccessMsg
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
successfulTargets = append(successfulTargets, target)
|
||||
|
|
@ -327,7 +284,7 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
|
|||
if _, exist := config.targetContext[target]; !exist {
|
||||
err := fmt.Errorf("target %s does not exists in subscription list", target)
|
||||
logger.Error(ctx, "update target does not exist in subscription list", "error", err, "target", target)
|
||||
targetResult.Code = constants.UpdateSubFailedCode
|
||||
targetResult.Code = constants.CodeUpdateSubTargetMissing
|
||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
continue
|
||||
|
|
@ -336,13 +293,13 @@ func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config
|
|||
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
|
||||
targetResult.Code = constants.UpdateSubFailedCode
|
||||
targetResult.Code = constants.CodeDBQueryFailed
|
||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error())
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
continue
|
||||
}
|
||||
|
||||
targetResult.Code = constants.UpdateSubSuccessCode
|
||||
targetResult.Code = constants.CodeSuccess
|
||||
targetResult.Msg = constants.UpdateSubSuccessMsg
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
successfulTargets = append(successfulTargets, target)
|
||||
|
|
@ -473,7 +430,7 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI
|
|||
if !exist {
|
||||
err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID)
|
||||
logger.Error(ctx, "clientID not found. use CreateConfig to start a new config", "error", err)
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeAppendSubTargetMissing, err), err
|
||||
}
|
||||
|
||||
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount)
|
||||
|
|
@ -507,7 +464,7 @@ func filterAndDeduplicateRepeatTargets(resultsSlice []network.TargetResult, idsS
|
|||
|
||||
for index := range resultsSlice {
|
||||
if _, isTarget := set[resultsSlice[index].ID]; isTarget {
|
||||
resultsSlice[index].Code = constants.SubRepeatCode
|
||||
resultsSlice[index].Code = constants.CodeSubTargetRepeat
|
||||
resultsSlice[index].Msg = constants.SubRepeatMsg
|
||||
}
|
||||
}
|
||||
|
|
@ -575,7 +532,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
s.globalMutex.RUnlock()
|
||||
err := fmt.Errorf("clientID %s not found", clientID)
|
||||
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeCancelSubTargetMissing, err), err
|
||||
}
|
||||
s.globalMutex.RUnlock()
|
||||
|
||||
|
|
@ -595,7 +552,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
for _, target := range measTargets {
|
||||
targetResult := network.TargetResult{
|
||||
ID: target,
|
||||
Code: constants.CancelSubFailedCode,
|
||||
Code: constants.CodeCancelSubTargetMissing,
|
||||
Msg: constants.CancelSubFailedMsg,
|
||||
}
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
|
|
@ -616,7 +573,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
transportTargets.Targets = append(transportTargets.Targets, existingTarget)
|
||||
targetResult := network.TargetResult{
|
||||
ID: existingTarget,
|
||||
Code: constants.CancelSubSuccessCode,
|
||||
Code: constants.CodeSuccess,
|
||||
Msg: constants.CancelSubSuccessMsg,
|
||||
}
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
|
|
@ -639,7 +596,7 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea
|
|||
for target := range targetsToRemoveMap {
|
||||
targetResult := network.TargetResult{
|
||||
ID: target,
|
||||
Code: constants.CancelSubFailedCode,
|
||||
Code: constants.CodeCancelSubTargetMissing,
|
||||
Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()),
|
||||
}
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
|
|
@ -670,10 +627,9 @@ func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientI
|
|||
s.globalMutex.RUnlock()
|
||||
|
||||
if !exist {
|
||||
s.globalMutex.RUnlock()
|
||||
err := fmt.Errorf("clientID %s not found", clientID)
|
||||
logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err
|
||||
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeUpdateSubTargetMissing, err), err
|
||||
}
|
||||
|
||||
targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForUpdate(ctx, tx, config, measurements, requestTargetsCount)
|
||||
|
|
@ -722,13 +678,13 @@ func processRealTimeRequestCount(measurements []network.RealTimeMeasurementItem)
|
|||
return totalTargetsCount
|
||||
}
|
||||
|
||||
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, err error) []network.TargetResult {
|
||||
func processRealTimeRequestTargets(measurements []network.RealTimeMeasurementItem, targetCount int, businessCode int, err error) []network.TargetResult {
|
||||
targetProcessResults := make([]network.TargetResult, 0, targetCount)
|
||||
for _, measurementItem := range measurements {
|
||||
for _, target := range measurementItem.Targets {
|
||||
var targetResult network.TargetResult
|
||||
targetResult.ID = target
|
||||
targetResult.Code = constants.SubFailedCode
|
||||
targetResult.Code = businessCode
|
||||
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
|
||||
targetProcessResults = append(targetProcessResults, targetResult)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,18 +3,25 @@ package network
|
|||
|
||||
// FailureResponse define struct of standard failure API response format
|
||||
type FailureResponse struct {
|
||||
Code int `json:"code" example:"500"`
|
||||
Msg string `json:"msg" example:"failed to get recommend data from redis"`
|
||||
Code int `json:"code" example:"3000"`
|
||||
Msg string `json:"msg" example:"process completed with partial failures"`
|
||||
Payload any `json:"payload" swaggertype:"object"`
|
||||
}
|
||||
|
||||
// SuccessResponse define struct of standard successful API response format
|
||||
type SuccessResponse struct {
|
||||
Code int `json:"code" example:"200"`
|
||||
Msg string `json:"msg" example:"success"`
|
||||
Code int `json:"code" example:"2000"`
|
||||
Msg string `json:"msg" example:"process completed"`
|
||||
Payload any `json:"payload" swaggertype:"object"`
|
||||
}
|
||||
|
||||
// WSResponse define struct of standard websocket API response format
|
||||
type WSResponse struct {
|
||||
Code int `json:"code" example:"2000"`
|
||||
Msg string `json:"msg" example:"process completed"`
|
||||
Payload any `json:"payload,omitempty" swaggertype:"object"`
|
||||
}
|
||||
|
||||
// MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response.
|
||||
type MeasurementRecommendPayload struct {
|
||||
Input string `json:"input" example:"transformfeeder1_220."`
|
||||
|
|
@ -26,7 +33,7 @@ type MeasurementRecommendPayload struct {
|
|||
// TargetResult define struct of target item in real time data subscription response payload
|
||||
type TargetResult struct {
|
||||
ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"`
|
||||
Code string `json:"code" example:"1001"`
|
||||
Code int `json:"code" example:"20000"`
|
||||
Msg string `json:"msg" example:"subscription success"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -205,13 +205,20 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
|||
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
|
||||
return
|
||||
case <-ticker.C:
|
||||
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize)
|
||||
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize)
|
||||
cancel()
|
||||
if err != nil {
|
||||
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
realTimedatas := util.ConvertZSetMembersToFloat64(members)
|
||||
if len(realTimedatas) == 0 {
|
||||
logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
|
||||
continue
|
||||
}
|
||||
|
||||
if conf.Analyzer != nil {
|
||||
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package util
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
|
@ -13,25 +14,36 @@ import (
|
|||
)
|
||||
|
||||
// NewRedisClient define func of initialize the Redis client
|
||||
func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) {
|
||||
func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
|
||||
// default options
|
||||
options := RedisOptions{
|
||||
redisOptions: &redis.Options{
|
||||
Addr: addr,
|
||||
configs := &clientConfig{
|
||||
Options: &redis.Options{
|
||||
Addr: addr,
|
||||
DialTimeout: 5 * time.Second,
|
||||
ReadTimeout: 3 * time.Second,
|
||||
WriteTimeout: 3 * time.Second,
|
||||
PoolSize: 10,
|
||||
},
|
||||
}
|
||||
|
||||
// Apply configuration options from config
|
||||
var errs []error
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
if err := opt(configs); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, fmt.Errorf("failed to apply options: %w", errors.Join(errs...))
|
||||
}
|
||||
|
||||
// create redis client
|
||||
client := redis.NewClient(options.redisOptions)
|
||||
client := redis.NewClient(configs.Options)
|
||||
|
||||
if options.timeout > 0 {
|
||||
if configs.DialTimeout > 0 {
|
||||
// check if the connection is successful
|
||||
ctx, cancel := context.WithTimeout(context.Background(), options.timeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), configs.DialTimeout)
|
||||
defer cancel()
|
||||
if err := client.Ping(ctx).Err(); err != nil {
|
||||
return nil, fmt.Errorf("can not connect redis:%v", err)
|
||||
|
|
@ -43,22 +55,29 @@ func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) {
|
|||
// NewRedigoPool define func of initialize the Redigo pool
|
||||
func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
|
||||
pool := &redigo.Pool{
|
||||
MaxIdle: rCfg.PoolSize / 2,
|
||||
MaxActive: rCfg.PoolSize,
|
||||
// TODO optimize IdleTimeout with config parameter
|
||||
MaxIdle: rCfg.PoolSize / 2,
|
||||
MaxActive: rCfg.PoolSize,
|
||||
IdleTimeout: 240 * time.Second,
|
||||
TestOnBorrow: func(c redigo.Conn, t time.Time) error {
|
||||
if time.Since(t) < time.Minute {
|
||||
return nil
|
||||
}
|
||||
_, err := c.Do("PING")
|
||||
return err
|
||||
},
|
||||
|
||||
// Dial function to create the connection
|
||||
Dial: func() (redigo.Conn, error) {
|
||||
timeout := time.Duration(rCfg.Timeout) * time.Millisecond // 假设 rCfg.Timeout 是毫秒
|
||||
dialTimeout := time.Duration(rCfg.DialTimeout) * time.Second
|
||||
readTimeout := time.Duration(rCfg.ReadTimeout) * time.Second
|
||||
writeTimeout := time.Duration(rCfg.WriteTimeout) * time.Second
|
||||
|
||||
opts := []redigo.DialOption{
|
||||
redigo.DialDatabase(rCfg.DB),
|
||||
redigo.DialPassword(rCfg.Password),
|
||||
redigo.DialConnectTimeout(timeout),
|
||||
// redigo.DialReadTimeout(timeout),
|
||||
// redigo.DialWriteTimeout(timeout),
|
||||
// TODO add redigo.DialUsername when redis open acl
|
||||
// redis.DialUsername("username"),
|
||||
redigo.DialConnectTimeout(dialTimeout),
|
||||
redigo.DialReadTimeout(readTimeout),
|
||||
redigo.DialWriteTimeout(writeTimeout),
|
||||
}
|
||||
|
||||
c, err := redigo.Dial("tcp", rCfg.Addr, opts...)
|
||||
|
|
@ -72,13 +91,14 @@ func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
|
|||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if conn.Err() != nil {
|
||||
return nil, fmt.Errorf("failed to get connection from pool: %w", conn.Err())
|
||||
if err := conn.Err(); err != nil {
|
||||
return nil, fmt.Errorf("failed to get connection from pool: %w", err)
|
||||
}
|
||||
|
||||
_, err := conn.Do("PING")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("redis connection test (PING) failed: %w", err)
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,53 +8,74 @@ import (
|
|||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type RedisOptions struct {
|
||||
redisOptions *redis.Options
|
||||
timeout time.Duration
|
||||
type clientConfig struct {
|
||||
*redis.Options
|
||||
}
|
||||
|
||||
type RedisOption func(*RedisOptions) error
|
||||
type Option func(*clientConfig) error
|
||||
|
||||
// WithPassword define func of configure redis password options
|
||||
func WithPassword(password string) RedisOption {
|
||||
return func(o *RedisOptions) error {
|
||||
func WithPassword(password string) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if password == "" {
|
||||
return errors.New("password is empty")
|
||||
}
|
||||
o.redisOptions.Password = password
|
||||
c.Password = password
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimeout define func of configure redis timeout options
|
||||
func WithTimeout(timeout time.Duration) RedisOption {
|
||||
return func(o *RedisOptions) error {
|
||||
// WithConnectTimeout define func of configure redis connect timeout options
|
||||
func WithConnectTimeout(timeout time.Duration) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if timeout < 0 {
|
||||
return errors.New("timeout can not be negative")
|
||||
}
|
||||
o.timeout = timeout
|
||||
c.DialTimeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithReadTimeout define func of configure redis read timeout options
|
||||
func WithReadTimeout(timeout time.Duration) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if timeout < 0 {
|
||||
return errors.New("timeout can not be negative")
|
||||
}
|
||||
c.ReadTimeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithWriteTimeout define func of configure redis write timeout options
|
||||
func WithWriteTimeout(timeout time.Duration) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if timeout < 0 {
|
||||
return errors.New("timeout can not be negative")
|
||||
}
|
||||
c.WriteTimeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithDB define func of configure redis db options
|
||||
func WithDB(db int) RedisOption {
|
||||
return func(o *RedisOptions) error {
|
||||
func WithDB(db int) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if db < 0 {
|
||||
return errors.New("db can not be negative")
|
||||
}
|
||||
o.redisOptions.DB = db
|
||||
c.DB = db
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPoolSize define func of configure pool size options
|
||||
func WithPoolSize(poolSize int) RedisOption {
|
||||
return func(o *RedisOptions) error {
|
||||
func WithPoolSize(poolSize int) Option {
|
||||
return func(c *clientConfig) error {
|
||||
if poolSize <= 0 {
|
||||
return errors.New("pool size must be greater than 0")
|
||||
}
|
||||
o.redisOptions.PoolSize = poolSize
|
||||
c.PoolSize = poolSize
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue