optimize the subscription process of redis locker

This commit is contained in:
douxu 2025-04-07 16:49:06 +08:00
parent e4d45016f2
commit fda43c65d2
3 changed files with 97 additions and 106 deletions

View File

@ -34,20 +34,21 @@ type RedissionLockConfig struct {
} }
type redissionLocker struct { type redissionLocker struct {
lockLeaseTime uint64 lockLeaseTime uint64
token string token string
key string key string
waitChanKey string waitChanKey string
needRefresh bool needRefresh bool
exit chan struct{} refreshExitChan chan struct{}
client *redis.Client subExitChan chan struct{}
once *sync.Once client *redis.Client
logger *zap.Logger refreshOnce *sync.Once
logger *zap.Logger
} }
func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) error { func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) error {
if rl.exit == nil { if rl.refreshExitChan == nil {
rl.exit = make(chan struct{}) rl.refreshExitChan = make(chan struct{})
} }
result := rl.tryLock(ctx).(*constant.RedisResult) result := rl.tryLock(ctx).(*constant.RedisResult)
if result.Code == constant.UnknownInternalError { if result.Code == constant.UnknownInternalError {
@ -56,7 +57,7 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e
} }
if (result.Code == constant.LockSuccess) && rl.needRefresh { if (result.Code == constant.LockSuccess) && rl.needRefresh {
rl.once.Do(func() { rl.refreshOnce.Do(func() {
// async refresh lock timeout unitl receive exit singal // async refresh lock timeout unitl receive exit singal
go rl.refreshLockTimeout(ctx) go rl.refreshLockTimeout(ctx)
}) })
@ -67,7 +68,7 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e
defer close(subMsg) defer close(subMsg)
sub := rl.client.Subscribe(ctx, rl.waitChanKey) sub := rl.client.Subscribe(ctx, rl.waitChanKey)
defer sub.Close() defer sub.Close()
go rl.subscribeLock(ctx, sub, subMsg) go rl.subscribeLock(sub, subMsg)
if len(timeout) > 0 && timeout[0] > 0 { if len(timeout) > 0 && timeout[0] > 0 {
acquireTimer := time.NewTimer(timeout[0]) acquireTimer := time.NewTimer(timeout[0])
@ -100,53 +101,21 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e
return fmt.Errorf("lock the redis lock failed:%w", result) return fmt.Errorf("lock the redis lock failed:%w", result)
} }
// TODO 优化订阅流程 func (rl *redissionLocker) subscribeLock(sub *redis.PubSub, subMsgChan chan struct{}) {
func (rl *redissionLocker) subscribeLock(ctx context.Context, sub *redis.PubSub, out chan struct{}) { if sub == nil || subMsgChan == nil {
if sub == nil || out == nil {
return return
} }
rl.logger.Info("lock: enter sub routine", zap.String("token", rl.token)) rl.logger.Info("lock: enter sub routine", zap.String("token", rl.token))
// subCh := sub.Channel()
// for msg := range subCh {
// // 这里只会收到真正的数据消息
// fmt.Printf("Channel: %s, Payload: %s\n",
// msg.Channel,
// msg.Payload)
// }
receiveChan := make(chan interface{}, 2)
go func() {
for {
msg, err := sub.Receive(ctx)
if err != nil {
if errors.Is(err, redis.ErrClosed) {
return
}
rl.logger.Error("sub receive message failed", zap.Error(err))
continue
}
rl.logger.Info("sub receive message", zap.Any("msg", msg))
receiveChan <- msg
}
}()
for { for {
select { select {
case <-rl.exit: case <-rl.subExitChan:
close(subMsgChan)
return return
case msg := <-receiveChan: case <-sub.Channel():
switch msg.(type) { // 这里只会收到真正的数据消息
case *redis.Subscription: subMsgChan <- struct{}{}
// Ignore. default:
case *redis.Pong:
// Ignore.
case *redis.Message:
out <- struct{}{}
default:
}
// case <-subCh:
// out <- struct{}{}
} }
} }
} }
@ -183,16 +152,26 @@ func (rl *redissionLocker) refreshLockTimeout(ctx context.Context) {
rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
} }
timer.Reset(lockTime) timer.Reset(lockTime)
case <-rl.exit: case <-rl.refreshExitChan:
return return
} }
} }
} }
func (rl *redissionLocker) cancelRefreshLockTime() { func (rl *redissionLocker) cancelRefreshLockTime() {
if rl.exit != nil { if rl.refreshExitChan != nil {
close(rl.exit) close(rl.refreshExitChan)
rl.once = &sync.Once{} rl.refreshOnce = &sync.Once{}
}
}
func (rl *redissionLocker) closeSub(sub *redis.PubSub, noticeChan chan struct{}) {
if sub != nil {
sub.Close()
}
if noticeChan != nil {
close(noticeChan)
} }
} }
@ -264,13 +243,13 @@ func GetLocker(client *redis.Client, ops *RedissionLockConfig) *redissionLocker
} }
r := &redissionLocker{ r := &redissionLocker{
token: ops.Token, token: ops.Token,
key: strings.Join([]string{ops.Prefix, ops.Key}, ":"), key: strings.Join([]string{ops.Prefix, ops.Key}, ":"),
waitChanKey: strings.Join([]string{ops.ChanPrefix, ops.Key, "wait"}, ":"), waitChanKey: strings.Join([]string{ops.ChanPrefix, ops.Key, "wait"}, ":"),
needRefresh: ops.NeedRefresh, needRefresh: ops.NeedRefresh,
client: client, client: client,
exit: make(chan struct{}), refreshExitChan: make(chan struct{}),
logger: logger.GetLoggerInstance(), logger: logger.GetLoggerInstance(),
} }
return r return r
} }

View File

@ -25,10 +25,6 @@ type RedissionRWLocker struct {
} }
func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration) error { func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration) error {
if rl.exit == nil {
rl.exit = make(chan struct{})
}
result := rl.tryRLock(ctx).(*constant.RedisResult) result := rl.tryRLock(ctx).(*constant.RedisResult)
if result.Code == constant.UnknownInternalError { if result.Code == constant.UnknownInternalError {
rl.logger.Error(result.OutputResultMessage()) rl.logger.Error(result.OutputResultMessage())
@ -37,7 +33,11 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration
if result.Code == constant.LockSuccess { if result.Code == constant.LockSuccess {
if rl.needRefresh { if rl.needRefresh {
rl.once.Do(func() { rl.refreshOnce.Do(func() {
if rl.refreshExitChan == nil {
rl.refreshExitChan = make(chan struct{})
}
// async refresh lock timeout unitl receive exit singal // async refresh lock timeout unitl receive exit singal
go rl.refreshLockTimeout(ctx) go rl.refreshLockTimeout(ctx)
}) })
@ -46,37 +46,41 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration
return nil return nil
} }
subMsg := make(chan struct{}, 1)
defer close(subMsg)
sub := rl.client.Subscribe(ctx, rl.readWaitChanKey)
defer sub.Close()
go rl.subscribeLock(ctx, sub, subMsg)
if len(timeout) > 0 && timeout[0] > 0 { if len(timeout) > 0 && timeout[0] > 0 {
if rl.subExitChan == nil {
rl.subExitChan = make(chan struct{})
}
subMsgChan := make(chan struct{}, 1)
sub := rl.client.Subscribe(ctx, rl.readWaitChanKey)
go rl.subscribeLock(sub, subMsgChan)
acquireTimer := time.NewTimer(timeout[0]) acquireTimer := time.NewTimer(timeout[0])
for { for {
select { select {
case _, ok := <-subMsg: case _, ok := <-subMsgChan:
if !ok { if !ok {
err := errors.New("failed to read the read lock waiting for for the channel message") err := errors.New("failed to read the read lock waiting for for the channel message")
rl.logger.Error("failed to read the read lock waiting for for the channel message") rl.logger.Error("failed to read the read lock waiting for for the channel message")
return err return err
} }
resultErr := rl.tryRLock(ctx).(*constant.RedisResult) result := rl.tryRLock(ctx).(*constant.RedisResult)
if (resultErr.Code == constant.RLockFailureWithWLockOccupancy) || (resultErr.Code == constant.UnknownInternalError) { if (result.Code == constant.RLockFailureWithWLockOccupancy) || (result.Code == constant.UnknownInternalError) {
rl.logger.Info(resultErr.OutputResultMessage()) rl.logger.Info(result.OutputResultMessage())
continue continue
} }
if resultErr.Code == constant.LockSuccess { if result.Code == constant.LockSuccess {
rl.logger.Info(resultErr.OutputResultMessage()) rl.logger.Info(result.OutputResultMessage())
rl.closeSub(sub, rl.subExitChan)
return nil return nil
} }
case <-acquireTimer.C: case <-acquireTimer.C:
err := errors.New("the waiting time for obtaining the read lock operation has timed out")
rl.logger.Info("the waiting time for obtaining the read lock operation has timed out") rl.logger.Info("the waiting time for obtaining the read lock operation has timed out")
return err rl.closeSub(sub, rl.subExitChan)
// after acquire lock timeout,notice the sub channel to close
return constant.AcquireTimeoutErr
} }
} }
} }
@ -121,7 +125,7 @@ func (rl *RedissionRWLocker) refreshLockTimeout(ctx context.Context) {
rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
} }
timer.Reset(lockTime) timer.Reset(lockTime)
case <-rl.exit: case <-rl.refreshExitChan:
return return
} }
} }
@ -153,10 +157,6 @@ func (rl *RedissionRWLocker) UnRLock(ctx context.Context) error {
} }
func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration) error { func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration) error {
if rl.exit == nil {
rl.exit = make(chan struct{})
}
result := rl.tryWLock(ctx).(*constant.RedisResult) result := rl.tryWLock(ctx).(*constant.RedisResult)
if result.Code == constant.UnknownInternalError { if result.Code == constant.UnknownInternalError {
rl.logger.Error(result.OutputResultMessage()) rl.logger.Error(result.OutputResultMessage())
@ -165,7 +165,11 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration
if result.Code == constant.LockSuccess { if result.Code == constant.LockSuccess {
if rl.needRefresh { if rl.needRefresh {
rl.once.Do(func() { rl.refreshOnce.Do(func() {
if rl.refreshExitChan == nil {
rl.refreshExitChan = make(chan struct{})
}
// async refresh lock timeout unitl receive exit singal // async refresh lock timeout unitl receive exit singal
go rl.refreshLockTimeout(ctx) go rl.refreshLockTimeout(ctx)
}) })
@ -174,17 +178,19 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration
return nil return nil
} }
subMsg := make(chan struct{}, 1)
defer close(subMsg)
sub := rl.client.Subscribe(ctx, rl.writeWaitChanKey)
defer sub.Close()
go rl.subscribeLock(ctx, sub, subMsg)
if len(timeout) > 0 && timeout[0] > 0 { if len(timeout) > 0 && timeout[0] > 0 {
if rl.subExitChan == nil {
rl.subExitChan = make(chan struct{})
}
subMsgChan := make(chan struct{}, 1)
sub := rl.client.Subscribe(ctx, rl.writeWaitChanKey)
go rl.subscribeLock(sub, subMsgChan)
acquireTimer := time.NewTimer(timeout[0]) acquireTimer := time.NewTimer(timeout[0])
for { for {
select { select {
case _, ok := <-subMsg: case _, ok := <-subMsgChan:
if !ok { if !ok {
err := errors.New("failed to read the write lock waiting for for the channel message") err := errors.New("failed to read the write lock waiting for for the channel message")
rl.logger.Error("failed to read the read lock waiting for for the channel message") rl.logger.Error("failed to read the read lock waiting for for the channel message")
@ -199,10 +205,13 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration
if result.Code == constant.LockSuccess { if result.Code == constant.LockSuccess {
rl.logger.Info(result.OutputResultMessage()) rl.logger.Info(result.OutputResultMessage())
rl.closeSub(sub, rl.subExitChan)
return nil return nil
} }
case <-acquireTimer.C: case <-acquireTimer.C:
rl.logger.Info("the waiting time for obtaining the write lock operation has timed out") rl.logger.Info("the waiting time for obtaining the write lock operation has timed out")
rl.closeSub(sub, rl.subExitChan)
// after acquire lock timeout,notice the sub channel to close
return constant.AcquireTimeoutErr return constant.AcquireTimeoutErr
} }
} }
@ -276,8 +285,7 @@ func GetRWLocker(client *redis.Client, conf *RedissionLockConfig) *RedissionRWLo
needRefresh: conf.NeedRefresh, needRefresh: conf.NeedRefresh,
lockLeaseTime: conf.LockLeaseTime, lockLeaseTime: conf.LockLeaseTime,
client: client, client: client,
exit: make(chan struct{}), refreshOnce: &sync.Once{},
once: &sync.Once{},
logger: logger.GetLoggerInstance(), logger: logger.GetLoggerInstance(),
} }

View File

@ -21,12 +21,18 @@ var (
func init() { func init() {
log = zap.Must(zap.NewDevelopment()) log = zap.Must(zap.NewDevelopment())
rdb = redis.NewClient(&redis.Options{ rdb = redis.NewClient(&redis.Options{
Network: "tcp", Network: "tcp",
Addr: "192.168.2.104:30001", Addr: "192.168.2.104:30001",
PoolSize: 50, // pool config
DialTimeout: 10 * time.Second, PoolSize: 100, // max connections
MaxIdleConns: 10, PoolFIFO: true,
MaxActiveConns: 40, PoolTimeout: 4 * time.Second,
MinIdleConns: 10, // min idle connections
MaxIdleConns: 20, // max idle connections
// tiemout config
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
}) })
} }
@ -468,10 +474,8 @@ func TestRWLock2CWithRLockAndWLockFailed(t *testing.T) {
return return
} }
// TODO 设计两个客户端,C1先加读锁成功与C2后加写锁成功
func TestRWLock2CWithRLockAndWLockSucceed(t *testing.T) { func TestRWLock2CWithRLockAndWLockSucceed(t *testing.T) {
ctx := context.TODO() ctx := context.TODO()
rdb.Conn()
rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{
LockLeaseTime: 120, LockLeaseTime: 120,
NeedRefresh: true, NeedRefresh: true,