From 617d21500eab8bf4383d09cdc5e7be19539112f6 Mon Sep 17 00:00:00 2001 From: douxu Date: Tue, 27 Jan 2026 17:41:17 +0800 Subject: [PATCH] optimize code of redis connenct func and real time data calculate --- config/config.go | 12 ++-- ...usiness_code.go => rtdata_subscription.go} | 23 -------- diagram/redis_client.go | 4 +- diagram/redis_init.go | 4 +- distributedlock/locker_init.go | 4 +- handler/real_time_data_pull.go | 2 +- handler/real_time_data_subscription.go | 1 - real-time-data/real_time_data_computing.go | 9 ++- util/redis_init.go | 58 +++++++++++++------ util/redis_options.go | 55 ++++++++++++------ 10 files changed, 101 insertions(+), 71 deletions(-) rename constants/{subscription_business_code.go => rtdata_subscription.go} (74%) diff --git a/config/config.go b/config/config.go index 21e4129..f44c11b 100644 --- a/config/config.go +++ b/config/config.go @@ -53,11 +53,13 @@ type LoggerConfig struct { // RedisConfig define config struct of redis config type RedisConfig struct { - Addr string `mapstructure:"addr"` - Password string `mapstructure:"password"` - DB int `mapstructure:"db"` - PoolSize int `mapstructure:"poolsize"` - Timeout int `mapstructure:"timeout"` + Addr string `mapstructure:"addr"` + Password string `mapstructure:"password"` + DB int `mapstructure:"db"` + PoolSize int `mapstructure:"poolsize"` + DialTimeout int `mapstructure:"dial_timeout"` + ReadTimeout int `mapstructure:"read_timeout"` + WriteTimeout int `mapstructure:"write_timeout"` } // AntsConfig define config struct of ants pool config diff --git a/constants/subscription_business_code.go b/constants/rtdata_subscription.go similarity index 74% rename from constants/subscription_business_code.go rename to constants/rtdata_subscription.go index 4b202bb..e14a3da 100644 --- a/constants/subscription_business_code.go +++ b/constants/rtdata_subscription.go @@ -12,29 +12,6 @@ const ( SubUpdateAction string = "update" ) -// 定义状态常量 -// TODO 从4位格式修改为5位格式 -const ( - // SubSuccessCode define subscription success code - SubSuccessCode = "1001" - // SubFailedCode define subscription failed code - SubFailedCode = "1002" - // RTDSuccessCode define real time data return success code - RTDSuccessCode = "1003" - // RTDFailedCode define real time data return failed code - RTDFailedCode = "1004" - // CancelSubSuccessCode define cancel subscription success code - CancelSubSuccessCode = "1005" - // CancelSubFailedCode define cancel subscription failed code - CancelSubFailedCode = "1006" - // SubRepeatCode define subscription repeat code - SubRepeatCode = "1007" - // UpdateSubSuccessCode define update subscription success code - UpdateSubSuccessCode = "1008" - // UpdateSubFailedCode define update subscription failed code - UpdateSubFailedCode = "1009" -) - const ( // SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine SysCtrlPrefix = "SYS_CTRL_" diff --git a/diagram/redis_client.go b/diagram/redis_client.go index 283669d..4b673a8 100644 --- a/diagram/redis_client.go +++ b/diagram/redis_client.go @@ -19,8 +19,8 @@ func NewRedisClient() *RedisClient { } } -// QueryByZRangeByLex define func to query real time data from redis zset -func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64) ([]redis.Z, error) { +// QueryByZRange define func to query real time data from redis zset +func (rc *RedisClient) QueryByZRange(ctx context.Context, key string, size int64) ([]redis.Z, error) { client := rc.Client args := redis.ZRangeArgs{ Key: key, diff --git a/diagram/redis_init.go b/diagram/redis_init.go index d1d7a22..273dd22 100644 --- a/diagram/redis_init.go +++ b/diagram/redis_init.go @@ -22,7 +22,9 @@ func initClient(rCfg config.RedisConfig) *redis.Client { util.WithPassword(rCfg.Password), util.WithDB(rCfg.DB), util.WithPoolSize(rCfg.PoolSize), - util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second), + util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second), + util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second), + util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second), ) if err != nil { panic(err) diff --git a/distributedlock/locker_init.go b/distributedlock/locker_init.go index 35ecc78..e3a7bc2 100644 --- a/distributedlock/locker_init.go +++ b/distributedlock/locker_init.go @@ -22,7 +22,9 @@ func initClient(rCfg config.RedisConfig) *redis.Client { util.WithPassword(rCfg.Password), util.WithDB(rCfg.DB), util.WithPoolSize(rCfg.PoolSize), - util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second), + util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second), + util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second), + util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second), ) if err != nil { panic(err) diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 5b298ae..a360a49 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -472,7 +472,7 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, } func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) { - members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize) + members, err := client.QueryByZRange(ctx, config.queryKey, config.dataSize) if err != nil { logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err) return diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index 4f87f33..afc396b 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -627,7 +627,6 @@ func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientI s.globalMutex.RUnlock() if !exist { - s.globalMutex.RUnlock() err := fmt.Errorf("clientID %s not found", clientID) logger.Error(ctx, "clientID not found in remove targets operation", "error", err) return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeUpdateSubTargetMissing, err), err diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index 6de1ab1..88a2a8a 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -205,13 +205,20 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) { logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal") return case <-ticker.C: - members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize) + queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize) + cancel() if err != nil { logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err) continue } realTimedatas := util.ConvertZSetMembersToFloat64(members) + if len(realTimedatas) == 0 { + logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey) + continue + } + if conf.Analyzer != nil { conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas) } else { diff --git a/util/redis_init.go b/util/redis_init.go index 8ea504a..f2c61a2 100644 --- a/util/redis_init.go +++ b/util/redis_init.go @@ -3,6 +3,7 @@ package util import ( "context" + "errors" "fmt" "time" @@ -13,25 +14,36 @@ import ( ) // NewRedisClient define func of initialize the Redis client -func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) { +func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) { // default options - options := RedisOptions{ - redisOptions: &redis.Options{ - Addr: addr, + configs := &clientConfig{ + Options: &redis.Options{ + Addr: addr, + DialTimeout: 5 * time.Second, + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, + PoolSize: 10, }, } // Apply configuration options from config + var errs []error for _, opt := range opts { - opt(&options) + if err := opt(configs); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return nil, fmt.Errorf("failed to apply options: %w", errors.Join(errs...)) } // create redis client - client := redis.NewClient(options.redisOptions) + client := redis.NewClient(configs.Options) - if options.timeout > 0 { + if configs.DialTimeout > 0 { // check if the connection is successful - ctx, cancel := context.WithTimeout(context.Background(), options.timeout) + ctx, cancel := context.WithTimeout(context.Background(), configs.DialTimeout) defer cancel() if err := client.Ping(ctx).Err(); err != nil { return nil, fmt.Errorf("can not connect redis:%v", err) @@ -43,22 +55,29 @@ func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) { // NewRedigoPool define func of initialize the Redigo pool func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) { pool := &redigo.Pool{ - MaxIdle: rCfg.PoolSize / 2, - MaxActive: rCfg.PoolSize, // TODO optimize IdleTimeout with config parameter + MaxIdle: rCfg.PoolSize / 2, + MaxActive: rCfg.PoolSize, IdleTimeout: 240 * time.Second, + TestOnBorrow: func(c redigo.Conn, t time.Time) error { + if time.Since(t) < time.Minute { + return nil + } + _, err := c.Do("PING") + return err + }, - // Dial function to create the connection Dial: func() (redigo.Conn, error) { - timeout := time.Duration(rCfg.Timeout) * time.Millisecond // 假设 rCfg.Timeout 是毫秒 + dialTimeout := time.Duration(rCfg.DialTimeout) * time.Second + readTimeout := time.Duration(rCfg.ReadTimeout) * time.Second + writeTimeout := time.Duration(rCfg.WriteTimeout) * time.Second + opts := []redigo.DialOption{ redigo.DialDatabase(rCfg.DB), redigo.DialPassword(rCfg.Password), - redigo.DialConnectTimeout(timeout), - // redigo.DialReadTimeout(timeout), - // redigo.DialWriteTimeout(timeout), - // TODO add redigo.DialUsername when redis open acl - // redis.DialUsername("username"), + redigo.DialConnectTimeout(dialTimeout), + redigo.DialReadTimeout(readTimeout), + redigo.DialWriteTimeout(writeTimeout), } c, err := redigo.Dial("tcp", rCfg.Addr, opts...) @@ -72,13 +91,14 @@ func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) { conn := pool.Get() defer conn.Close() - if conn.Err() != nil { - return nil, fmt.Errorf("failed to get connection from pool: %w", conn.Err()) + if err := conn.Err(); err != nil { + return nil, fmt.Errorf("failed to get connection from pool: %w", err) } _, err := conn.Do("PING") if err != nil { return nil, fmt.Errorf("redis connection test (PING) failed: %w", err) } + return pool, nil } diff --git a/util/redis_options.go b/util/redis_options.go index dbb4feb..6293105 100644 --- a/util/redis_options.go +++ b/util/redis_options.go @@ -8,53 +8,74 @@ import ( "github.com/redis/go-redis/v9" ) -type RedisOptions struct { - redisOptions *redis.Options - timeout time.Duration +type clientConfig struct { + *redis.Options } -type RedisOption func(*RedisOptions) error +type Option func(*clientConfig) error // WithPassword define func of configure redis password options -func WithPassword(password string) RedisOption { - return func(o *RedisOptions) error { +func WithPassword(password string) Option { + return func(c *clientConfig) error { if password == "" { return errors.New("password is empty") } - o.redisOptions.Password = password + c.Password = password return nil } } -// WithTimeout define func of configure redis timeout options -func WithTimeout(timeout time.Duration) RedisOption { - return func(o *RedisOptions) error { +// WithConnectTimeout define func of configure redis connect timeout options +func WithConnectTimeout(timeout time.Duration) Option { + return func(c *clientConfig) error { if timeout < 0 { return errors.New("timeout can not be negative") } - o.timeout = timeout + c.DialTimeout = timeout + return nil + } +} + +// WithReadTimeout define func of configure redis read timeout options +func WithReadTimeout(timeout time.Duration) Option { + return func(c *clientConfig) error { + if timeout < 0 { + return errors.New("timeout can not be negative") + } + c.ReadTimeout = timeout + return nil + } +} + +// WithWriteTimeout define func of configure redis write timeout options +func WithWriteTimeout(timeout time.Duration) Option { + return func(c *clientConfig) error { + if timeout < 0 { + return errors.New("timeout can not be negative") + } + c.WriteTimeout = timeout return nil } } // WithDB define func of configure redis db options -func WithDB(db int) RedisOption { - return func(o *RedisOptions) error { +func WithDB(db int) Option { + return func(c *clientConfig) error { if db < 0 { return errors.New("db can not be negative") } - o.redisOptions.DB = db + c.DB = db return nil } } // WithPoolSize define func of configure pool size options -func WithPoolSize(poolSize int) RedisOption { - return func(o *RedisOptions) error { +func WithPoolSize(poolSize int) Option { + return func(c *clientConfig) error { if poolSize <= 0 { return errors.New("pool size must be greater than 0") } - o.redisOptions.PoolSize = poolSize + c.PoolSize = poolSize return nil } }