optimize code of redis connenct func and real time data calculate

This commit is contained in:
douxu 2026-01-27 17:41:17 +08:00
parent 1a1727adab
commit 617d21500e
10 changed files with 101 additions and 71 deletions

View File

@ -53,11 +53,13 @@ type LoggerConfig struct {
// RedisConfig define config struct of redis config // RedisConfig define config struct of redis config
type RedisConfig struct { type RedisConfig struct {
Addr string `mapstructure:"addr"` Addr string `mapstructure:"addr"`
Password string `mapstructure:"password"` Password string `mapstructure:"password"`
DB int `mapstructure:"db"` DB int `mapstructure:"db"`
PoolSize int `mapstructure:"poolsize"` PoolSize int `mapstructure:"poolsize"`
Timeout int `mapstructure:"timeout"` DialTimeout int `mapstructure:"dial_timeout"`
ReadTimeout int `mapstructure:"read_timeout"`
WriteTimeout int `mapstructure:"write_timeout"`
} }
// AntsConfig define config struct of ants pool config // AntsConfig define config struct of ants pool config

View File

@ -12,29 +12,6 @@ const (
SubUpdateAction string = "update" SubUpdateAction string = "update"
) )
// 定义状态常量
// TODO 从4位格式修改为5位格式
const (
// SubSuccessCode define subscription success code
SubSuccessCode = "1001"
// SubFailedCode define subscription failed code
SubFailedCode = "1002"
// RTDSuccessCode define real time data return success code
RTDSuccessCode = "1003"
// RTDFailedCode define real time data return failed code
RTDFailedCode = "1004"
// CancelSubSuccessCode define cancel subscription success code
CancelSubSuccessCode = "1005"
// CancelSubFailedCode define cancel subscription failed code
CancelSubFailedCode = "1006"
// SubRepeatCode define subscription repeat code
SubRepeatCode = "1007"
// UpdateSubSuccessCode define update subscription success code
UpdateSubSuccessCode = "1008"
// UpdateSubFailedCode define update subscription failed code
UpdateSubFailedCode = "1009"
)
const ( const (
// SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine // SysCtrlPrefix define to indicates the prefix for all system control directives,facilitating unified parsing within the sendDataStream goroutine
SysCtrlPrefix = "SYS_CTRL_" SysCtrlPrefix = "SYS_CTRL_"

View File

@ -19,8 +19,8 @@ func NewRedisClient() *RedisClient {
} }
} }
// QueryByZRangeByLex define func to query real time data from redis zset // QueryByZRange define func to query real time data from redis zset
func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64) ([]redis.Z, error) { func (rc *RedisClient) QueryByZRange(ctx context.Context, key string, size int64) ([]redis.Z, error) {
client := rc.Client client := rc.Client
args := redis.ZRangeArgs{ args := redis.ZRangeArgs{
Key: key, Key: key,

View File

@ -22,7 +22,9 @@ func initClient(rCfg config.RedisConfig) *redis.Client {
util.WithPassword(rCfg.Password), util.WithPassword(rCfg.Password),
util.WithDB(rCfg.DB), util.WithDB(rCfg.DB),
util.WithPoolSize(rCfg.PoolSize), util.WithPoolSize(rCfg.PoolSize),
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second), util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
) )
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -22,7 +22,9 @@ func initClient(rCfg config.RedisConfig) *redis.Client {
util.WithPassword(rCfg.Password), util.WithPassword(rCfg.Password),
util.WithDB(rCfg.DB), util.WithDB(rCfg.DB),
util.WithPoolSize(rCfg.PoolSize), util.WithPoolSize(rCfg.PoolSize),
util.WithTimeout(time.Duration(rCfg.Timeout)*time.Second), util.WithConnectTimeout(time.Duration(rCfg.DialTimeout)*time.Second),
util.WithReadTimeout(time.Duration(rCfg.ReadTimeout)*time.Second),
util.WithWriteTimeout(time.Duration(rCfg.WriteTimeout)*time.Second),
) )
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -472,7 +472,7 @@ func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig,
} }
func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) { func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) {
members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize) members, err := client.QueryByZRange(ctx, config.queryKey, config.dataSize)
if err != nil { if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err) logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err)
return return

View File

@ -627,7 +627,6 @@ func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientI
s.globalMutex.RUnlock() s.globalMutex.RUnlock()
if !exist { if !exist {
s.globalMutex.RUnlock()
err := fmt.Errorf("clientID %s not found", clientID) err := fmt.Errorf("clientID %s not found", clientID)
logger.Error(ctx, "clientID not found in remove targets operation", "error", err) logger.Error(ctx, "clientID not found in remove targets operation", "error", err)
return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeUpdateSubTargetMissing, err), err return processRealTimeRequestTargets(measurements, requestTargetsCount, constants.CodeUpdateSubTargetMissing, err), err

View File

@ -205,13 +205,20 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) {
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal") logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
return return
case <-ticker.C: case <-ticker.C:
members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize) queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize)
cancel()
if err != nil { if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err) logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
continue continue
} }
realTimedatas := util.ConvertZSetMembersToFloat64(members) realTimedatas := util.ConvertZSetMembersToFloat64(members)
if len(realTimedatas) == 0 {
logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
continue
}
if conf.Analyzer != nil { if conf.Analyzer != nil {
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas) conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
} else { } else {

View File

@ -3,6 +3,7 @@ package util
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time" "time"
@ -13,25 +14,36 @@ import (
) )
// NewRedisClient define func of initialize the Redis client // NewRedisClient define func of initialize the Redis client
func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) { func NewRedisClient(addr string, opts ...Option) (*redis.Client, error) {
// default options // default options
options := RedisOptions{ configs := &clientConfig{
redisOptions: &redis.Options{ Options: &redis.Options{
Addr: addr, Addr: addr,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolSize: 10,
}, },
} }
// Apply configuration options from config // Apply configuration options from config
var errs []error
for _, opt := range opts { for _, opt := range opts {
opt(&options) if err := opt(configs); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return nil, fmt.Errorf("failed to apply options: %w", errors.Join(errs...))
} }
// create redis client // create redis client
client := redis.NewClient(options.redisOptions) client := redis.NewClient(configs.Options)
if options.timeout > 0 { if configs.DialTimeout > 0 {
// check if the connection is successful // check if the connection is successful
ctx, cancel := context.WithTimeout(context.Background(), options.timeout) ctx, cancel := context.WithTimeout(context.Background(), configs.DialTimeout)
defer cancel() defer cancel()
if err := client.Ping(ctx).Err(); err != nil { if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("can not connect redis:%v", err) return nil, fmt.Errorf("can not connect redis:%v", err)
@ -43,22 +55,29 @@ func NewRedisClient(addr string, opts ...RedisOption) (*redis.Client, error) {
// NewRedigoPool define func of initialize the Redigo pool // NewRedigoPool define func of initialize the Redigo pool
func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) { func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
pool := &redigo.Pool{ pool := &redigo.Pool{
MaxIdle: rCfg.PoolSize / 2,
MaxActive: rCfg.PoolSize,
// TODO optimize IdleTimeout with config parameter // TODO optimize IdleTimeout with config parameter
MaxIdle: rCfg.PoolSize / 2,
MaxActive: rCfg.PoolSize,
IdleTimeout: 240 * time.Second, IdleTimeout: 240 * time.Second,
TestOnBorrow: func(c redigo.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
// Dial function to create the connection
Dial: func() (redigo.Conn, error) { Dial: func() (redigo.Conn, error) {
timeout := time.Duration(rCfg.Timeout) * time.Millisecond // 假设 rCfg.Timeout 是毫秒 dialTimeout := time.Duration(rCfg.DialTimeout) * time.Second
readTimeout := time.Duration(rCfg.ReadTimeout) * time.Second
writeTimeout := time.Duration(rCfg.WriteTimeout) * time.Second
opts := []redigo.DialOption{ opts := []redigo.DialOption{
redigo.DialDatabase(rCfg.DB), redigo.DialDatabase(rCfg.DB),
redigo.DialPassword(rCfg.Password), redigo.DialPassword(rCfg.Password),
redigo.DialConnectTimeout(timeout), redigo.DialConnectTimeout(dialTimeout),
// redigo.DialReadTimeout(timeout), redigo.DialReadTimeout(readTimeout),
// redigo.DialWriteTimeout(timeout), redigo.DialWriteTimeout(writeTimeout),
// TODO add redigo.DialUsername when redis open acl
// redis.DialUsername("username"),
} }
c, err := redigo.Dial("tcp", rCfg.Addr, opts...) c, err := redigo.Dial("tcp", rCfg.Addr, opts...)
@ -72,13 +91,14 @@ func NewRedigoPool(rCfg config.RedisConfig) (*redigo.Pool, error) {
conn := pool.Get() conn := pool.Get()
defer conn.Close() defer conn.Close()
if conn.Err() != nil { if err := conn.Err(); err != nil {
return nil, fmt.Errorf("failed to get connection from pool: %w", conn.Err()) return nil, fmt.Errorf("failed to get connection from pool: %w", err)
} }
_, err := conn.Do("PING") _, err := conn.Do("PING")
if err != nil { if err != nil {
return nil, fmt.Errorf("redis connection test (PING) failed: %w", err) return nil, fmt.Errorf("redis connection test (PING) failed: %w", err)
} }
return pool, nil return pool, nil
} }

View File

@ -8,53 +8,74 @@ import (
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
type RedisOptions struct { type clientConfig struct {
redisOptions *redis.Options *redis.Options
timeout time.Duration
} }
type RedisOption func(*RedisOptions) error type Option func(*clientConfig) error
// WithPassword define func of configure redis password options // WithPassword define func of configure redis password options
func WithPassword(password string) RedisOption { func WithPassword(password string) Option {
return func(o *RedisOptions) error { return func(c *clientConfig) error {
if password == "" { if password == "" {
return errors.New("password is empty") return errors.New("password is empty")
} }
o.redisOptions.Password = password c.Password = password
return nil return nil
} }
} }
// WithTimeout define func of configure redis timeout options // WithConnectTimeout define func of configure redis connect timeout options
func WithTimeout(timeout time.Duration) RedisOption { func WithConnectTimeout(timeout time.Duration) Option {
return func(o *RedisOptions) error { return func(c *clientConfig) error {
if timeout < 0 { if timeout < 0 {
return errors.New("timeout can not be negative") return errors.New("timeout can not be negative")
} }
o.timeout = timeout c.DialTimeout = timeout
return nil
}
}
// WithReadTimeout define func of configure redis read timeout options
func WithReadTimeout(timeout time.Duration) Option {
return func(c *clientConfig) error {
if timeout < 0 {
return errors.New("timeout can not be negative")
}
c.ReadTimeout = timeout
return nil
}
}
// WithWriteTimeout define func of configure redis write timeout options
func WithWriteTimeout(timeout time.Duration) Option {
return func(c *clientConfig) error {
if timeout < 0 {
return errors.New("timeout can not be negative")
}
c.WriteTimeout = timeout
return nil return nil
} }
} }
// WithDB define func of configure redis db options // WithDB define func of configure redis db options
func WithDB(db int) RedisOption { func WithDB(db int) Option {
return func(o *RedisOptions) error { return func(c *clientConfig) error {
if db < 0 { if db < 0 {
return errors.New("db can not be negative") return errors.New("db can not be negative")
} }
o.redisOptions.DB = db c.DB = db
return nil return nil
} }
} }
// WithPoolSize define func of configure pool size options // WithPoolSize define func of configure pool size options
func WithPoolSize(poolSize int) RedisOption { func WithPoolSize(poolSize int) Option {
return func(o *RedisOptions) error { return func(c *clientConfig) error {
if poolSize <= 0 { if poolSize <= 0 {
return errors.New("pool size must be greater than 0") return errors.New("pool size must be greater than 0")
} }
o.redisOptions.PoolSize = poolSize c.PoolSize = poolSize
return nil return nil
} }
} }