From 09225fc96fe64830ba3cbd1fceb9c75485dcbe0e Mon Sep 17 00:00:00 2001 From: douxu Date: Thu, 6 Mar 2025 16:35:36 +0800 Subject: [PATCH] optimize structer of redisRWLock and acquisition statements of write lock --- distributedlock/constant/redis_err.go | 88 ++++--- distributedlock/luascript/rlock_script.go | 18 +- distributedlock/redis_lock.go | 11 +- distributedlock/redis_rwlock.go | 293 +++++++++------------- 4 files changed, 179 insertions(+), 231 deletions(-) diff --git a/distributedlock/constant/redis_err.go b/distributedlock/constant/redis_err.go index 69579dd..875df2f 100644 --- a/distributedlock/constant/redis_err.go +++ b/distributedlock/constant/redis_err.go @@ -1,23 +1,25 @@ package constant -import "fmt" +import ( + "fmt" +) -type RedisResult int +type RedisCode int const ( - LockSuccess = RedisResult(1) - UnLockSuccess = RedisResult(1) - RefreshLockSuccess = RedisResult(1) - UnRLockSuccess = RedisResult(0) - RLockFailure = RedisResult(-1) - UnRLockFailureWithWLockOccupancy = RedisResult(-2) - WLockFailureWithRLockOccupancy = RedisResult(-3) - WLockFailureWithWLockOccupancy = RedisResult(-4) - UnWLockFailureWithRLockOccupancy = RedisResult(-5) - UnWLockFailureWithWLockOccupancy = RedisResult(-6) - WLockFailureWithNotFirstPriority = RedisResult(-7) - RefreshLockFailure = RedisResult(-8) - UnknownInternalError = RedisResult(-99) + LockSuccess = RedisCode(1) + UnLockSuccess = RedisCode(1) + RefreshLockSuccess = RedisCode(1) + UnRLockSuccess = RedisCode(0) + RLockFailureWithWLockOccupancy = RedisCode(-1) + UnRLockFailureWithWLockOccupancy = RedisCode(-2) + WLockFailureWithRLockOccupancy = RedisCode(-3) + WLockFailureWithWLockOccupancy = RedisCode(-4) + UnWLockFailureWithRLockOccupancy = RedisCode(-5) + UnWLockFailureWithWLockOccupancy = RedisCode(-6) + WLockFailureWithNotFirstPriority = RedisCode(-7) + RefreshLockFailure = RedisCode(-8) + UnknownInternalError = RedisCode(-99) ) type RedisLockType int @@ -28,68 +30,64 @@ const ( RefreshLockType ) -type RedisError struct { - Code RedisResult +type RedisResult struct { + Code RedisCode Message string } -func (e *RedisError) Error() string { +func (e *RedisResult) Error() string { return fmt.Sprintf("redis execution code:%d,message:%s\n", e.Code, e.Message) } -func (e *RedisError) OutputResultMessage() string { +func (e *RedisResult) OutputResultMessage() string { return e.Message } -func (e *RedisError) OutputResultCode() int { +func (e *RedisResult) OutputResultCode() int { return int(e.Code) } -func NewRedisError(res RedisResult) error { - resInt := int(res) - switch resInt { - case -1: - return &RedisError{Code: -1, Message: "redis lock read lock failure,the lock is already occupied by another processes write lock"} - default: - return nil - } -} - -func ConvertResultToErr(res RedisResult, lockType RedisLockType, redisMsg string) error { +func NewRedisResult(res RedisCode, lockType RedisLockType, redisMsg string) error { resInt := int(res) switch resInt { case 1: if lockType == LockType { - return &RedisError{Code: res, Message: "redis lock success"} + return &RedisResult{Code: res, Message: "redis lock success"} } else if lockType == UnLockType { - return &RedisError{Code: res, Message: "redis unlock success"} + return &RedisResult{Code: res, Message: "redis unlock success"} } else { - return &RedisError{Code: res, Message: "redis refresh lock success"} + return &RedisResult{Code: res, Message: "redis refresh lock success"} } case 0: - return &RedisError{Code: res, Message: "redis unlock read lock success, the lock is still occupied by other processes read lock"} + return &RedisResult{Code: res, Message: "redis unlock read lock success, the lock is still occupied by other processes read lock"} case -1: - return &RedisError{Code: res, Message: "redis lock read lock failure,the lock is already occupied by another processes write lock"} + return &RedisResult{Code: res, Message: "redis lock read lock failure,the lock is already occupied by another processes write lock"} case -2: - return &RedisError{Code: res, Message: "redis un lock read lock failure,the lock is already occupied by another processes write lock"} + return &RedisResult{Code: res, Message: "redis un lock read lock failure,the lock is already occupied by another processes write lock"} case -3: - return &RedisError{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes read lock"} + return &RedisResult{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes read lock"} case -4: - return &RedisError{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes write lock"} + return &RedisResult{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes write lock"} case -5: - return &RedisError{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes read lock"} + return &RedisResult{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes read lock"} case -6: - return &RedisError{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes write lock"} + return &RedisResult{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes write lock"} case -7: - return &RedisError{Code: res, Message: "redis lock write lock failure,the first priority in the current process non-waiting queue"} + return &RedisResult{Code: res, Message: "redis lock write lock failure,the first priority in the current process non-waiting queue"} case -8: - return &RedisError{Code: res, Message: "redis refresh lock failure,the lock not exist"} + return &RedisResult{Code: res, Message: "redis refresh lock failure,the lock not exist"} + case -99: + return &RedisResult{Code: res, Message: "redis internal execution error"} default: - return &RedisError{Code: res, Message: fmt.Sprintf("unkown redis execution result:%s\n", redisMsg)} + msg := "unkown redis execution result" + if redisMsg != "" { + msg = fmt.Sprintf("%s:%s\n", msg, redisMsg) + } + return &RedisResult{Code: res, Message: msg} } } -func TranslateResultToStr(res RedisResult, lockType RedisLockType) string { +func TranslateResultToStr(res RedisCode, lockType RedisLockType) string { resInt := int(res) switch resInt { case 1: diff --git a/distributedlock/luascript/rlock_script.go b/distributedlock/luascript/rlock_script.go index 3588670..278ea9c 100644 --- a/distributedlock/luascript/rlock_script.go +++ b/distributedlock/luascript/rlock_script.go @@ -20,9 +20,9 @@ if (mode == false) then end; if (mode == 'write') then - -- TODO 放到 list 中等待写锁释放后再次尝试加锁并且订阅写锁释放的消息 - local key = KEYS[1] .. ':read'; - redis.call('rpush', key, ARGV[2]); + -- 放到 list 中等待写锁释放后再次尝试加锁并且订阅写锁释放的消息 + local waitKey = KEYS[1] .. ':read'; + redis.call('rpush', waitKey, ARGV[2]); return -1; end; @@ -60,8 +60,7 @@ end; /* KEYS[1]:锁的键名(key),通常是锁的唯一标识。 KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 -KEYS[3]:锁的释放通知读频道(chankey),用于通知其他客户端锁已释放。 -KEYS[4]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。 +KEYS[3]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。 ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。 ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 */ @@ -74,8 +73,6 @@ if (mode == false) then local counter = redis.call('llen',writeWait) if (counter >= 1) then redis.call('publish', KEYS[4], ARGV[1]); - else - redis.call('publish', KEYS[3], ARGV[1]); end; return 1; elseif (mode == 'write') then @@ -150,7 +147,7 @@ if (mode == false) then redis.call('lpop', waitKey, '1') return 1; elseif (mode == 'read') then - -- TODO 放到 list 中等待读锁释放后再次尝试加锁并且订阅读锁释放的消息 + -- 放到 list 中等待读锁释放后再次尝试加锁并且订阅读锁释放的消息 redis.call('rpush', waitkey, ARGV[2]); return -3; else @@ -174,8 +171,7 @@ end; /* KEYS[1]:锁的键名(key),通常是锁的唯一标识。 KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 -KEYS[3]:锁的释放通知读频道(chankey),用于通知其他客户端锁已释放。 -KEYS[4]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。 +KEYS[3]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。 ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。 ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 */ @@ -186,8 +182,6 @@ if (mode == false) then -- 优先写锁加锁,无写锁的情况通知读锁加锁 local counter = redis.call('llen',writeWait) if (counter >= 1) then - redis.call('publish', KEYS[4], ARGV[1]); - else redis.call('publish', KEYS[3], ARGV[1]); end; return 1; diff --git a/distributedlock/redis_lock.go b/distributedlock/redis_lock.go index 6f9b9aa..6cc7a21 100644 --- a/distributedlock/redis_lock.go +++ b/distributedlock/redis_lock.go @@ -57,7 +57,7 @@ var unlockScript string = strings.Join([]string{ }, "") const ( - internalLockLeaseTime = uint64(30) * 1000 + internalLockLeaseTime = uint64(30) unlockMessage = 0 ) @@ -71,7 +71,7 @@ type RedissionLockConfig struct { type redissionLocker struct { token string key string - waitChankey string + waitChanKey string exit chan struct{} lockLeaseTime uint64 client *redis.Client @@ -98,7 +98,7 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) { submsg := make(chan struct{}, 1) defer close(submsg) - sub := rl.client.Subscribe(rl.waitChankey) + sub := rl.client.Subscribe(rl.waitChanKey) defer sub.Close() go rl.subscribeLock(sub, submsg) // listen := rl.listenManager.Subscribe(rl.key, rl.token) @@ -242,7 +242,6 @@ LOOP: func (rl *redissionLocker) cancelRefreshLockTime() { if rl.exit != nil { close(rl.exit) - rl.exit = nil rl.once = &sync.Once{} } } @@ -262,7 +261,7 @@ func (rl *redissionLocker) tryLock() (time.Duration, error) { } func (rl *redissionLocker) UnLock() { - res := rl.client.Eval(unlockScript, []string{rl.key, rl.waitChankey}, unlockMessage, rl.lockLeaseTime, rl.token) + res := rl.client.Eval(unlockScript, []string{rl.key, rl.waitChanKey}, unlockMessage, rl.lockLeaseTime, rl.token) val, err := res.Result() if err != redis.Nil && err != nil { panic(err) @@ -294,6 +293,6 @@ func GetLocker(client *redis.Client, ops *RedissionLockConfig) *redissionLocker r.lockLeaseTime = internalLockLeaseTime } r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":") - r.waitChankey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":") + r.waitChanKey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":") return r } diff --git a/distributedlock/redis_rwlock.go b/distributedlock/redis_rwlock.go index d99c467..5a35133 100644 --- a/distributedlock/redis_rwlock.go +++ b/distributedlock/redis_rwlock.go @@ -1,11 +1,9 @@ package distributed_lock import ( - "context" "errors" "fmt" "strings" - "sync" "time" "modelRT/distributedlock/constant" @@ -17,26 +15,25 @@ import ( "go.uber.org/zap" ) -type redissionReadLocker struct { +type RedissionRWLocker struct { redissionLocker - rwTimeoutPrefix string - prefixKey string - needRefresh bool + writeWaitChanKey string + rwTimeoutPrefix string + needRefresh bool } -// TODO 将参数中的 ctx 优化掉 -func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duration) error { +func (rl *RedissionRWLocker) RLock(timeout ...time.Duration) error { if rl.exit == nil { rl.exit = make(chan struct{}) } - resultErr := rl.tryLock().(*constant.RedisError) - if resultErr.Code == constant.UnknownInternalError { - rl.logger.Error(resultErr.OutputResultMessage()) - return fmt.Errorf("get read lock failed:%w", resultErr) + result := rl.tryRLock().(*constant.RedisResult) + if result.Code == constant.UnknownInternalError { + rl.logger.Error(result.OutputResultMessage()) + return fmt.Errorf("get read lock failed:%w", result) } - if (resultErr.Code == constant.LockSuccess) && rl.needRefresh { + if (result.Code == constant.LockSuccess) && rl.needRefresh { rl.once.Do(func() { // async refresh lock timeout unitl receive exit singal go rl.refreshLockTimeout() @@ -44,21 +41,17 @@ func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duratio return nil } - var acquireTimer *time.Timer - if len(timeout) > 0 && timeout[0] > 0 { - acquireTimer = time.NewTimer(timeout[0]) - } - subMsg := make(chan struct{}, 1) defer close(subMsg) - sub := rl.client.Subscribe(rl.waitChankey) + sub := rl.client.Subscribe(rl.writeWaitChanKey) defer sub.Close() go rl.subscribeLock(sub, subMsg) if len(timeout) > 0 && timeout[0] > 0 { - acquireTimer = time.NewTimer(timeout[0]) + acquireTimer := time.NewTimer(timeout[0]) for { select { + case _, ok := <-subMsg: if !ok { err := errors.New("failed to read the read lock waiting for for the channel message") @@ -66,8 +59,8 @@ func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duratio return err } - resultErr := rl.tryLock().(*constant.RedisError) - if (resultErr.Code == constant.RLockFailure) || (resultErr.Code == constant.UnknownInternalError) { + resultErr := rl.tryRLock().(*constant.RedisResult) + if (resultErr.Code == constant.RLockFailureWithWLockOccupancy) || (resultErr.Code == constant.UnknownInternalError) { rl.logger.Info(resultErr.OutputResultMessage()) continue } @@ -83,229 +76,193 @@ func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duratio } } } - return fmt.Errorf("get read lock failed:%w", constant.NewRedisError(constant.RLockFailure)) + return fmt.Errorf("lock read lock failed:%w", result) } -func (rl *redissionReadLocker) tryLock() error { +func (rl *RedissionRWLocker) tryRLock() error { lockType := constant.LockType + res := rl.client.Eval(luascript.RLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token) - v, err := res.Result() + val, err := res.Int() if err != redis.Nil && err != nil { - return constant.ConvertResultToErr(constant.UnknownInternalError, lockType, err.Error()) + return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error()) } - return constant.ConvertResultToErr(v.(constant.RedisResult), lockType, "") + return constant.NewRedisResult(constant.RedisCode(val), lockType, "") } -func (rl *redissionReadLocker) refreshLockTimeout() { - rl.logger.Debug("rlock: %s lock %s\n", zap.String("token", rl.token), zap.String("key", rl.key)) - lockTime := time.Duration(rl.lockLeaseTime/3) * time.Millisecond +func (rl *RedissionRWLocker) refreshLockTimeout() { + rl.logger.Info("read lock refresh by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + + lockTime := time.Duration(rl.lockLeaseTime/3) * time.Second timer := time.NewTimer(lockTime) defer timer.Stop() -LOOP: + for { select { case <-timer.C: - timer.Reset(lockTime) - // update key expire time - res := rl.client.Eval(luascript.RefreshLockScript, []string{rl.key, rl.prefixKey}, rl.lockLeaseTime, rl.token) + // extend key lease time + res := rl.client.Eval(luascript.RefreshLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token) val, err := res.Int() - if err != nil { - panic(err) + if err != redis.Nil && err != nil { + rl.logger.Info("read lock refresh failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) + return } - if val == 0 { - rl.logger.Debug("not find the rlock key of self") - break LOOP - } - case <-rl.exit: - break LOOP + if constant.RedisCode(val) == constant.RefreshLockFailure { + rl.logger.Error("read lock refreash failed,can not find the read lock by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + break + } + + if constant.RedisCode(val) == constant.RefreshLockSuccess { + rl.logger.Info("read lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + } + timer.Reset(lockTime) + case <-rl.exit: + break } } - rl.logger.Debug("rlock: refresh routine release", zap.String("token", rl.token)) } -func (rl *redissionReadLocker) UnLock() { - res := rl.client.Eval(luascript.UnRLockScript, []string{rl.key, rl.waitChankey, rl.rwTimeoutPrefix, rl.prefixKey}, unlockMessage, rl.token) - val, err := res.Result() +func (rl *RedissionRWLocker) UnRLock() error { + res := rl.client.Eval(luascript.UnRLockScript, []string{rl.key, rl.rwTimeoutPrefix, rl.writeWaitChanKey}, unlockMessage, rl.token) + val, err := res.Int() if err != redis.Nil && err != nil { - panic(err) + rl.logger.Info("unlock read lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) + return fmt.Errorf("unlock read lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.LockType, err.Error())) } - if val == nil { - panic("attempt to unlock lock, not locked by current routine by lock id:" + rl.token) + + if (constant.RedisCode(val) == constant.UnLockSuccess) || (constant.RedisCode(val) == constant.UnRLockSuccess) { + if rl.needRefresh { + rl.cancelRefreshLockTime() + } + + rl.logger.Info("unlock read lock success", zap.String("token", rl.token), zap.String("key", rl.key)) + return nil } - rl.logger.Debug("lock: %s unlock %s\n", zap.String("token", rl.token), zap.String("key", rl.key)) - if val.(int64) == 1 { - rl.cancelRefreshLockTime() + + if constant.RedisCode(val) == constant.UnRLockFailureWithWLockOccupancy { + rl.logger.Info("unlock read lock failed", zap.String("token", rl.token), zap.String("key", rl.key)) + return fmt.Errorf("unlock read lock failed:%w", constant.NewRedisResult(constant.UnRLockFailureWithWLockOccupancy, constant.UnLockType, "")) } + return nil } -type redissionWriteLocker struct { - redissionLocker -} - -func (rl *redissionWriteLocker) Lock(ctx context.Context, timeout ...time.Duration) { +func (rl *RedissionRWLocker) WLock(timeout ...time.Duration) error { if rl.exit == nil { rl.exit = make(chan struct{}) } - ttl, err := rl.tryLock() - if err != nil { - panic(err) + + result := rl.tryWLock().(*constant.RedisResult) + if result.Code == constant.UnknownInternalError { + rl.logger.Error(result.OutputResultMessage()) + return fmt.Errorf("get write lock failed:%w", result) } - if ttl <= 0 { + if (result.Code == constant.LockSuccess) && rl.needRefresh { rl.once.Do(func() { // async refresh lock timeout unitl receive exit singal go rl.refreshLockTimeout() }) - return + return nil } - submsg := make(chan struct{}, 1) - defer close(submsg) - sub := rl.client.Subscribe(rl.waitChankey) + subMsg := make(chan struct{}, 1) + defer close(subMsg) + sub := rl.client.Subscribe(rl.writeWaitChanKey) defer sub.Close() - go rl.subscribeLock(sub, submsg) - // listen := rl.listenManager.Subscribe(rl.key, rl.token) - // defer rl.listenManager.UnSubscribe(rl.key, rl.token) + go rl.subscribeLock(sub, subMsg) - timer := time.NewTimer(ttl) - defer timer.Stop() - // outimer 理解为如果超过这个时间没有获取到锁,就直接放弃 - var outimer *time.Timer if len(timeout) > 0 && timeout[0] > 0 { - outimer = time.NewTimer(timeout[0]) - } -LOOP: - for { - ttl, err = rl.tryLock() - if err != nil { - panic(err) - } - - if ttl <= 0 { - rl.once.Do(func() { - go rl.refreshLockTimeout() - }) - return - } - if outimer != nil { + acquireTimer := time.NewTimer(timeout[0]) + for { select { - case _, ok := <-submsg: - if !timer.Stop() { - <-timer.C - } - + case _, ok := <-subMsg: if !ok { - panic("lock listen release") + err := errors.New("failed to read the write lock waiting for for the channel message") + rl.logger.Error("failed to read the read lock waiting for for the channel message") + return err } - timer.Reset(ttl) - case <-ctx.Done(): - // break LOOP - panic("lock context already release") - case <-timer.C: - timer.Reset(ttl) - case <-outimer.C: - if !timer.Stop() { - <-timer.C - } - break LOOP - } - } else { - select { - case _, ok := <-submsg: - if !timer.Stop() { - <-timer.C + result := rl.tryWLock().(*constant.RedisResult) + if (result.Code == constant.UnknownInternalError) || (result.Code == constant.WLockFailureWithRLockOccupancy) || (result.Code == constant.WLockFailureWithWLockOccupancy) || (result.Code == constant.WLockFailureWithNotFirstPriority) { + rl.logger.Info(result.OutputResultMessage()) + continue } - if !ok { - panic("lock listen release") + if result.Code == constant.LockSuccess { + rl.logger.Info(result.OutputResultMessage()) + return nil } - - timer.Reset(ttl) - case <-ctx.Done(): - // break LOOP - panic("lock context already release") - case <-timer.C: - timer.Reset(ttl) + case <-acquireTimer.C: + err := errors.New("the waiting time for obtaining the write lock operation has timed out") + rl.logger.Info("the waiting time for obtaining the write lock operation has timed out") + return err } } } + return fmt.Errorf("lock write lock failed:%w", result) } -func (rl *redissionWriteLocker) tryLock() (time.Duration, error) { - res := rl.client.Eval(luascript.WLockScript, []string{rl.key}, rl.lockLeaseTime, rl.token) - v, err := res.Result() +func (rl *RedissionRWLocker) tryWLock() error { + lockType := constant.LockType + + res := rl.client.Eval(luascript.WLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token) + val, err := res.Int() if err != redis.Nil && err != nil { - return 0, err + return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error()) } - - if v == nil { - return 0, nil - } - - return time.Duration(v.(int64)), nil + return constant.NewRedisResult(constant.RedisCode(val), lockType, "") } -func (rl *redissionWriteLocker) UnLock() { - res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.waitChankey}, unlockMessage, rl.lockLeaseTime, rl.token) - val, err := res.Result() +func (rl *RedissionRWLocker) UnWLock() error { + res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.rwTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token) + val, err := res.Int() if err != redis.Nil && err != nil { - panic(err) + rl.logger.Info("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) + return fmt.Errorf("unlock write lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnLockType, err.Error())) } - if val == nil { - panic("attempt to unlock lock, not locked by current routine by lock id:" + rl.token) + + if constant.RedisCode(val) == constant.UnLockSuccess { + if rl.needRefresh { + rl.cancelRefreshLockTime() + } + rl.logger.Info("unlock write lock success", zap.String("token", rl.token), zap.String("key", rl.key)) + return nil } - rl.logger.Debug("lock: unlock", zap.String("token", rl.token), zap.String("key", rl.key)) - if val.(int64) == 1 { - rl.cancelRefreshLockTime() + + if (constant.RedisCode(val) == constant.UnWLockFailureWithRLockOccupancy) || (constant.RedisCode(val) == constant.UnWLockFailureWithWLockOccupancy) { + rl.logger.Info("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key)) + return fmt.Errorf("unlock write lock failed:%w", constant.NewRedisResult(constant.RedisCode(val), constant.UnLockType, "")) } + return nil } -func GetReadLocker(client *redis.Client, ops *RedissionLockConfig) *redissionReadLocker { +func GetRWLocker(client *redis.Client, ops *RedissionLockConfig) *RedissionRWLocker { r := &redissionLocker{ token: uuid.New().String(), client: client, exit: make(chan struct{}), - once: &sync.Once{}, - } - - if len(ops.Prefix) <= 0 { - ops.Prefix = "redission-rwlock" - } - - if len(ops.ChanPrefix) <= 0 { - ops.ChanPrefix = "redission-rwlock-channel" - } - - if ops.LockLeaseTime == 0 { - r.lockLeaseTime = internalLockLeaseTime - } - r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":") - r.waitChankey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":") - tkey := strings.Join([]string{"{", r.key, "}"}, "") - return &redissionReadLocker{redissionLocker: *r, rwTimeoutPrefix: strings.Join([]string{tkey, r.token, "rwlock_timeout"}, ":"), prefixKey: tkey, needRefresh: true} -} - -func GetWriteLocker(client *redis.Client, ops *RedissionLockConfig) *redissionWriteLocker { - r := &redissionLocker{ - token: uuid.New().String(), - client: client, - exit: make(chan struct{}), - once: &sync.Once{}, logger: logger.GetLoggerInstance(), } if len(ops.Prefix) <= 0 { ops.Prefix = "redission-rwlock" } + if len(ops.ChanPrefix) <= 0 { ops.ChanPrefix = "redission-rwlock-channel" } + if ops.LockLeaseTime == 0 { r.lockLeaseTime = internalLockLeaseTime } r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":") - r.waitChankey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":") - return &redissionWriteLocker{redissionLocker: *r} + + rwLocker := &RedissionRWLocker{ + redissionLocker: *r, + writeWaitChanKey: strings.Join([]string{r.key, "write"}, ":"), + rwTimeoutPrefix: "rwlock_timeout", + needRefresh: true, + } + return rwLocker }