From fda43c65d288c488edaaf611a639a94cd5b23333 Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 7 Apr 2025 16:49:06 +0800 Subject: [PATCH] optimize the subscription process of redis locker --- distributedlock/redis_lock.go | 107 +++++++++++++------------------- distributedlock/redis_rwlock.go | 76 +++++++++++++---------- distributedlock/rwlock_test.go | 20 +++--- 3 files changed, 97 insertions(+), 106 deletions(-) diff --git a/distributedlock/redis_lock.go b/distributedlock/redis_lock.go index 567160c..f96522e 100644 --- a/distributedlock/redis_lock.go +++ b/distributedlock/redis_lock.go @@ -34,20 +34,21 @@ type RedissionLockConfig struct { } type redissionLocker struct { - lockLeaseTime uint64 - token string - key string - waitChanKey string - needRefresh bool - exit chan struct{} - client *redis.Client - once *sync.Once - logger *zap.Logger + lockLeaseTime uint64 + token string + key string + waitChanKey string + needRefresh bool + refreshExitChan chan struct{} + subExitChan chan struct{} + client *redis.Client + refreshOnce *sync.Once + logger *zap.Logger } func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) error { - if rl.exit == nil { - rl.exit = make(chan struct{}) + if rl.refreshExitChan == nil { + rl.refreshExitChan = make(chan struct{}) } result := rl.tryLock(ctx).(*constant.RedisResult) if result.Code == constant.UnknownInternalError { @@ -56,7 +57,7 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e } if (result.Code == constant.LockSuccess) && rl.needRefresh { - rl.once.Do(func() { + rl.refreshOnce.Do(func() { // async refresh lock timeout unitl receive exit singal go rl.refreshLockTimeout(ctx) }) @@ -67,7 +68,7 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e defer close(subMsg) sub := rl.client.Subscribe(ctx, rl.waitChanKey) defer sub.Close() - go rl.subscribeLock(ctx, sub, subMsg) + go rl.subscribeLock(sub, subMsg) if len(timeout) > 0 && timeout[0] > 0 { acquireTimer := time.NewTimer(timeout[0]) @@ -100,53 +101,21 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e return fmt.Errorf("lock the redis lock failed:%w", result) } -// TODO 优化订阅流程 -func (rl *redissionLocker) subscribeLock(ctx context.Context, sub *redis.PubSub, out chan struct{}) { - if sub == nil || out == nil { +func (rl *redissionLocker) subscribeLock(sub *redis.PubSub, subMsgChan chan struct{}) { + if sub == nil || subMsgChan == nil { return } rl.logger.Info("lock: enter sub routine", zap.String("token", rl.token)) - // subCh := sub.Channel() - // for msg := range subCh { - // // 这里只会收到真正的数据消息 - // fmt.Printf("Channel: %s, Payload: %s\n", - // msg.Channel, - // msg.Payload) - // } - - receiveChan := make(chan interface{}, 2) - go func() { - for { - msg, err := sub.Receive(ctx) - if err != nil { - if errors.Is(err, redis.ErrClosed) { - return - } - rl.logger.Error("sub receive message failed", zap.Error(err)) - continue - } - rl.logger.Info("sub receive message", zap.Any("msg", msg)) - receiveChan <- msg - } - }() - for { select { - case <-rl.exit: + case <-rl.subExitChan: + close(subMsgChan) return - case msg := <-receiveChan: - switch msg.(type) { - case *redis.Subscription: - // Ignore. - case *redis.Pong: - // Ignore. - case *redis.Message: - out <- struct{}{} - default: - } - // case <-subCh: - // out <- struct{}{} + case <-sub.Channel(): + // 这里只会收到真正的数据消息 + subMsgChan <- struct{}{} + default: } } } @@ -183,16 +152,26 @@ func (rl *redissionLocker) refreshLockTimeout(ctx context.Context) { rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) } timer.Reset(lockTime) - case <-rl.exit: + case <-rl.refreshExitChan: return } } } func (rl *redissionLocker) cancelRefreshLockTime() { - if rl.exit != nil { - close(rl.exit) - rl.once = &sync.Once{} + if rl.refreshExitChan != nil { + close(rl.refreshExitChan) + rl.refreshOnce = &sync.Once{} + } +} + +func (rl *redissionLocker) closeSub(sub *redis.PubSub, noticeChan chan struct{}) { + if sub != nil { + sub.Close() + } + + if noticeChan != nil { + close(noticeChan) } } @@ -264,13 +243,13 @@ func GetLocker(client *redis.Client, ops *RedissionLockConfig) *redissionLocker } r := &redissionLocker{ - token: ops.Token, - key: strings.Join([]string{ops.Prefix, ops.Key}, ":"), - waitChanKey: strings.Join([]string{ops.ChanPrefix, ops.Key, "wait"}, ":"), - needRefresh: ops.NeedRefresh, - client: client, - exit: make(chan struct{}), - logger: logger.GetLoggerInstance(), + token: ops.Token, + key: strings.Join([]string{ops.Prefix, ops.Key}, ":"), + waitChanKey: strings.Join([]string{ops.ChanPrefix, ops.Key, "wait"}, ":"), + needRefresh: ops.NeedRefresh, + client: client, + refreshExitChan: make(chan struct{}), + logger: logger.GetLoggerInstance(), } return r } diff --git a/distributedlock/redis_rwlock.go b/distributedlock/redis_rwlock.go index af8e797..f34c839 100644 --- a/distributedlock/redis_rwlock.go +++ b/distributedlock/redis_rwlock.go @@ -25,10 +25,6 @@ type RedissionRWLocker struct { } func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration) error { - if rl.exit == nil { - rl.exit = make(chan struct{}) - } - result := rl.tryRLock(ctx).(*constant.RedisResult) if result.Code == constant.UnknownInternalError { rl.logger.Error(result.OutputResultMessage()) @@ -37,7 +33,11 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration if result.Code == constant.LockSuccess { if rl.needRefresh { - rl.once.Do(func() { + rl.refreshOnce.Do(func() { + if rl.refreshExitChan == nil { + rl.refreshExitChan = make(chan struct{}) + } + // async refresh lock timeout unitl receive exit singal go rl.refreshLockTimeout(ctx) }) @@ -46,37 +46,41 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration return nil } - subMsg := make(chan struct{}, 1) - defer close(subMsg) - sub := rl.client.Subscribe(ctx, rl.readWaitChanKey) - defer sub.Close() - go rl.subscribeLock(ctx, sub, subMsg) - if len(timeout) > 0 && timeout[0] > 0 { + if rl.subExitChan == nil { + rl.subExitChan = make(chan struct{}) + } + + subMsgChan := make(chan struct{}, 1) + sub := rl.client.Subscribe(ctx, rl.readWaitChanKey) + go rl.subscribeLock(sub, subMsgChan) + acquireTimer := time.NewTimer(timeout[0]) for { select { - case _, ok := <-subMsg: + case _, ok := <-subMsgChan: if !ok { err := errors.New("failed to read the read lock waiting for for the channel message") rl.logger.Error("failed to read the read lock waiting for for the channel message") return err } - resultErr := rl.tryRLock(ctx).(*constant.RedisResult) - if (resultErr.Code == constant.RLockFailureWithWLockOccupancy) || (resultErr.Code == constant.UnknownInternalError) { - rl.logger.Info(resultErr.OutputResultMessage()) + result := rl.tryRLock(ctx).(*constant.RedisResult) + if (result.Code == constant.RLockFailureWithWLockOccupancy) || (result.Code == constant.UnknownInternalError) { + rl.logger.Info(result.OutputResultMessage()) continue } - if resultErr.Code == constant.LockSuccess { - rl.logger.Info(resultErr.OutputResultMessage()) + if result.Code == constant.LockSuccess { + rl.logger.Info(result.OutputResultMessage()) + rl.closeSub(sub, rl.subExitChan) return nil } case <-acquireTimer.C: - err := errors.New("the waiting time for obtaining the read lock operation has timed out") rl.logger.Info("the waiting time for obtaining the read lock operation has timed out") - return err + rl.closeSub(sub, rl.subExitChan) + // after acquire lock timeout,notice the sub channel to close + return constant.AcquireTimeoutErr } } } @@ -121,7 +125,7 @@ func (rl *RedissionRWLocker) refreshLockTimeout(ctx context.Context) { rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) } timer.Reset(lockTime) - case <-rl.exit: + case <-rl.refreshExitChan: return } } @@ -153,10 +157,6 @@ func (rl *RedissionRWLocker) UnRLock(ctx context.Context) error { } func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration) error { - if rl.exit == nil { - rl.exit = make(chan struct{}) - } - result := rl.tryWLock(ctx).(*constant.RedisResult) if result.Code == constant.UnknownInternalError { rl.logger.Error(result.OutputResultMessage()) @@ -165,7 +165,11 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration if result.Code == constant.LockSuccess { if rl.needRefresh { - rl.once.Do(func() { + rl.refreshOnce.Do(func() { + if rl.refreshExitChan == nil { + rl.refreshExitChan = make(chan struct{}) + } + // async refresh lock timeout unitl receive exit singal go rl.refreshLockTimeout(ctx) }) @@ -174,17 +178,19 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration return nil } - subMsg := make(chan struct{}, 1) - defer close(subMsg) - sub := rl.client.Subscribe(ctx, rl.writeWaitChanKey) - defer sub.Close() - go rl.subscribeLock(ctx, sub, subMsg) - if len(timeout) > 0 && timeout[0] > 0 { + if rl.subExitChan == nil { + rl.subExitChan = make(chan struct{}) + } + + subMsgChan := make(chan struct{}, 1) + sub := rl.client.Subscribe(ctx, rl.writeWaitChanKey) + go rl.subscribeLock(sub, subMsgChan) + acquireTimer := time.NewTimer(timeout[0]) for { select { - case _, ok := <-subMsg: + case _, ok := <-subMsgChan: if !ok { err := errors.New("failed to read the write lock waiting for for the channel message") rl.logger.Error("failed to read the read lock waiting for for the channel message") @@ -199,10 +205,13 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration if result.Code == constant.LockSuccess { rl.logger.Info(result.OutputResultMessage()) + rl.closeSub(sub, rl.subExitChan) return nil } case <-acquireTimer.C: rl.logger.Info("the waiting time for obtaining the write lock operation has timed out") + rl.closeSub(sub, rl.subExitChan) + // after acquire lock timeout,notice the sub channel to close return constant.AcquireTimeoutErr } } @@ -276,8 +285,7 @@ func GetRWLocker(client *redis.Client, conf *RedissionLockConfig) *RedissionRWLo needRefresh: conf.NeedRefresh, lockLeaseTime: conf.LockLeaseTime, client: client, - exit: make(chan struct{}), - once: &sync.Once{}, + refreshOnce: &sync.Once{}, logger: logger.GetLoggerInstance(), } diff --git a/distributedlock/rwlock_test.go b/distributedlock/rwlock_test.go index 82bc3d2..341abec 100644 --- a/distributedlock/rwlock_test.go +++ b/distributedlock/rwlock_test.go @@ -21,12 +21,18 @@ var ( func init() { log = zap.Must(zap.NewDevelopment()) rdb = redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - MaxIdleConns: 10, - MaxActiveConns: 40, + Network: "tcp", + Addr: "192.168.2.104:30001", + // pool config + PoolSize: 100, // max connections + PoolFIFO: true, + PoolTimeout: 4 * time.Second, + MinIdleConns: 10, // min idle connections + MaxIdleConns: 20, // max idle connections + // tiemout config + DialTimeout: 5 * time.Second, + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, }) } @@ -468,10 +474,8 @@ func TestRWLock2CWithRLockAndWLockFailed(t *testing.T) { return } -// TODO 设计两个客户端,C1先加读锁成功与C2后加写锁成功 func TestRWLock2CWithRLockAndWLockSucceed(t *testing.T) { ctx := context.TODO() - rdb.Conn() rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120, NeedRefresh: true,