From 7e3d94db4bd35576a84e94d10fb7af0ac49711a6 Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 7 Mar 2025 16:16:26 +0800 Subject: [PATCH] optimize structer of redisLock and acquisition statements of lock --- .../{redis_err.go => redis_result.go} | 4 + distributedlock/luascript/lock_script.go | 62 ++++ .../{rlock_script.go => rwlock_script.go} | 34 +-- distributedlock/redis_lock.go | 271 +++++++----------- distributedlock/redis_rwlock.go | 51 ++-- 5 files changed, 218 insertions(+), 204 deletions(-) rename distributedlock/constant/{redis_err.go => redis_result.go} (95%) create mode 100644 distributedlock/luascript/lock_script.go rename distributedlock/luascript/{rlock_script.go => rwlock_script.go} (86%) diff --git a/distributedlock/constant/redis_err.go b/distributedlock/constant/redis_result.go similarity index 95% rename from distributedlock/constant/redis_err.go rename to distributedlock/constant/redis_result.go index 875df2f..d389a39 100644 --- a/distributedlock/constant/redis_err.go +++ b/distributedlock/constant/redis_result.go @@ -19,6 +19,8 @@ const ( UnWLockFailureWithWLockOccupancy = RedisCode(-6) WLockFailureWithNotFirstPriority = RedisCode(-7) RefreshLockFailure = RedisCode(-8) + LockFailure = RedisCode(-9) + UnLocakFailureWithLockOccupancy = RedisCode(-10) UnknownInternalError = RedisCode(-99) ) @@ -76,6 +78,8 @@ func NewRedisResult(res RedisCode, lockType RedisLockType, redisMsg string) erro return &RedisResult{Code: res, Message: "redis lock write lock failure,the first priority in the current process non-waiting queue"} case -8: return &RedisResult{Code: res, Message: "redis refresh lock failure,the lock not exist"} + case -9: + return &RedisResult{Code: res, Message: "redis lock failure,the lock is already occupied by another processes lock"} case -99: return &RedisResult{Code: res, Message: "redis internal execution error"} default: diff --git a/distributedlock/luascript/lock_script.go b/distributedlock/luascript/lock_script.go new file mode 100644 index 0000000..64a6be8 --- /dev/null +++ b/distributedlock/luascript/lock_script.go @@ -0,0 +1,62 @@ +package luascript + +/* +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +*/ +var LockScript = ` +-- 锁不存在的情况下加锁 +if (redis.call('exists', KEYS[1]) == 0) then + redis.call('hset', KEYS[1], ARGV[2], 1); + redis.call('expire', KEYS[1], ARGV[1]); + return 1; +end; +-- 重入锁逻辑 +if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then + redis.call('hincrby', KEYS[1], ARGV[2], 1); + redis.call('expire', KEYS[1], ARGV[1]); + return 1; +end; +-- 持有锁的 token 不是当前客户端的 token,返回加锁失败 +return -9; +` + +/* +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +*/ +var RefreshLockScript = ` +if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then + redis.call('expire', KEYS[1], ARGV[1]); + return 1; +end; +return -8; +` + +/* +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +KEYS[2]:锁的释放通知频道(chankey),用于通知其他客户端锁已释放。 +ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +*/ +var UnLockScript = ` +if (redis.call('exists', KEYS[1]) == 0) then + redis.call('publish', KEYS[2], ARGV[1]); + return 1; +end; +if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then + return 1; +end; +local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); +if (counter > 0) then + return 1; +else + redis.call('del', KEYS[1]); + redis.call('publish', KEYS[2], ARGV[1]); + return 1; +end; +-- 持有锁的 token 不是当前客户端的 token,返回解锁失败 +return -10; +` diff --git a/distributedlock/luascript/rlock_script.go b/distributedlock/luascript/rwlock_script.go similarity index 86% rename from distributedlock/luascript/rlock_script.go rename to distributedlock/luascript/rwlock_script.go index 278ea9c..814c559 100644 --- a/distributedlock/luascript/rlock_script.go +++ b/distributedlock/luascript/rwlock_script.go @@ -3,10 +3,10 @@ package luascript // RLockScript is the lua script for the lock read lock command /* -KEYS[1]:锁的键名(key),通常是锁的唯一标识。 -KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 -ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 -ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 +ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 */ var RLockScript = ` local mode = redis.call('hget', KEYS[1], 'mode'); @@ -126,10 +126,10 @@ end; // WLockScript is the lua script for the lock write lock command /* -KEYS[1]:锁的键名(key),通常是锁的唯一标识。 -KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 -ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 -ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 +ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 */ var WLockScript = ` local mode = redis.call('hget', KEYS[1], 'mode'); @@ -169,11 +169,11 @@ end; // UnWLockScript is the lua script for the unlock write lock command /* -KEYS[1]:锁的键名(key),通常是锁的唯一标识。 -KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 KEYS[3]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。 ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。 -ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 */ var UnWLockScript = ` local mode = redis.call('hget', KEYS[1], 'mode'); @@ -208,14 +208,14 @@ else end; ` -// RefreshLockScript is the lua script for the refresh lock command +// RefreshRWLockScript is the lua script for the refresh lock command /* -KEYS[1]:锁的键名(key),通常是锁的唯一标识。 -KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 -ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 -ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 +ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 */ -var RefreshLockScript = ` +var RefreshRWLockScript = ` local lockKey = KEYS[2] .. ':' .. ARGV[2] local lockExists = redis.call('hexists', KEYS[1], lockKey); local mode = redis.call('hget', KEYS[1], 'mode') diff --git a/distributedlock/redis_lock.go b/distributedlock/redis_lock.go index 6cc7a21..9653589 100644 --- a/distributedlock/redis_lock.go +++ b/distributedlock/redis_lock.go @@ -1,61 +1,21 @@ package distributed_lock import ( - "context" + "errors" "fmt" "strings" "sync" "time" + "modelRT/distributedlock/constant" luascript "modelRT/distributedlock/luascript" + "modelRT/logger" "github.com/go-redis/redis" uuid "github.com/google/uuid" "go.uber.org/zap" ) -var lockScript string = strings.Join([]string{ - "if (redis.call('exists', KEYS[1]) == 0) then ", - "redis.call('hset', KEYS[1], ARGV[2], 1); ", - "redis.call('pexpire', KEYS[1], ARGV[1]); ", - "return nil; ", - "end; ", - "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then ", - "redis.call('hincrby', KEYS[1], ARGV[2], 1); ", - "redis.call('pexpire', KEYS[1], ARGV[1]); ", - "return nil; ", - "end; ", - "return redis.call('pttl', KEYS[1]);", -}, "") - -var refreshLockScript string = strings.Join([]string{ - "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then ", - "redis.call('pexpire', KEYS[1], ARGV[1]); ", - "return 1; ", - "end; ", - "return 0;", -}, "") - -var unlockScript string = strings.Join([]string{ - "if (redis.call('exists', KEYS[1]) == 0) then ", - "redis.call('publish', KEYS[2], ARGV[1]); ", - "return 1; ", - "end;", - "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then ", - "return nil;", - "end; ", - "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); ", - "if (counter > 0) then ", - "redis.call('pexpire', KEYS[1], ARGV[2]); ", - "return 0; ", - "else ", - "redis.call('del', KEYS[1]); ", - "redis.call('publish', KEYS[2], ARGV[1]); ", - "return 1; ", - "end; ", - "return nil;", -}, "") - const ( internalLockLeaseTime = uint64(30) unlockMessage = 0 @@ -65,138 +25,95 @@ type RedissionLockConfig struct { LockLeaseTime time.Duration Prefix string ChanPrefix string + TimeoutPrefix string Key string } type redissionLocker struct { + lockLeaseTime uint64 token string key string waitChanKey string + needRefresh bool exit chan struct{} - lockLeaseTime uint64 client *redis.Client once *sync.Once logger *zap.Logger } -func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) { - fmt.Println(luascript.RLockScript) +func (rl *redissionLocker) Lock(timeout ...time.Duration) error { if rl.exit == nil { rl.exit = make(chan struct{}) } - ttl, err := rl.tryLock() - if err != nil { - panic(err) + result := rl.tryLock().(*constant.RedisResult) + if result.Code == constant.UnknownInternalError { + rl.logger.Error(result.OutputResultMessage()) + return fmt.Errorf("get lock failed:%w", result) } - if ttl <= 0 { + if (result.Code == constant.LockSuccess) && rl.needRefresh { rl.once.Do(func() { + // async refresh lock timeout unitl receive exit singal go rl.refreshLockTimeout() }) - return + return nil } - submsg := make(chan struct{}, 1) - defer close(submsg) + subMsg := make(chan struct{}, 1) + defer close(subMsg) sub := rl.client.Subscribe(rl.waitChanKey) defer sub.Close() - go rl.subscribeLock(sub, submsg) - // listen := rl.listenManager.Subscribe(rl.key, rl.token) - // defer rl.listenManager.UnSubscribe(rl.key, rl.token) + go rl.subscribeLock(sub, subMsg) - timer := time.NewTimer(ttl) - defer timer.Stop() - // outimer 的作用理解为如果超过多长时间无法获得这个锁,那么就直接放弃 - var outimer *time.Timer if len(timeout) > 0 && timeout[0] > 0 { - outimer = time.NewTimer(timeout[0]) - } -LOOP: - for { - ttl, err = rl.tryLock() - if err != nil { - panic(err) - } - - if ttl <= 0 { - rl.once.Do(func() { - go rl.refreshLockTimeout() - }) - return - } - if outimer != nil { + acquireTimer := time.NewTimer(timeout[0]) + for { select { - case _, ok := <-submsg: - if !timer.Stop() { - <-timer.C - } + case _, ok := <-subMsg: if !ok { - panic("lock listen release") + err := errors.New("failed to read the lock waiting for for the channel message") + rl.logger.Error("failed to read the lock waiting for for the channel message") + return err } - timer.Reset(ttl) - case <-ctx.Done(): - // break LOOP - panic("lock context already release") - case <-timer.C: - timer.Reset(ttl) - case <-outimer.C: - if !timer.Stop() { - <-timer.C - } - break LOOP - } - } else { - select { - case _, ok := <-submsg: - if !timer.Stop() { - <-timer.C + resultErr := rl.tryLock().(*constant.RedisResult) + if (resultErr.Code == constant.LockFailure) || (resultErr.Code == constant.UnknownInternalError) { + rl.logger.Info(resultErr.OutputResultMessage()) + continue } - if !ok { - panic("lock listen release") + if resultErr.Code == constant.LockSuccess { + rl.logger.Info(resultErr.OutputResultMessage()) + return nil } - - timer.Reset(ttl) - case <-ctx.Done(): - // break LOOP - panic("lock context already release") - case <-timer.C: - timer.Reset(ttl) + case <-acquireTimer.C: + err := errors.New("the waiting time for obtaining the lock operation has timed out") + rl.logger.Info("the waiting time for obtaining the lock operation has timed out") + return err } } } + return fmt.Errorf("lock the redis lock failed:%w", result) } func (rl *redissionLocker) subscribeLock(sub *redis.PubSub, out chan struct{}) { - defer func() { - if err := recover(); err != nil { - rl.logger.Error("subscribeLock catch error", zap.Error(err.(error))) - } - }() if sub == nil || out == nil { return } - rl.logger.Debug("lock:%s enter sub routine", zap.String("token", rl.token)) -LOOP: + rl.logger.Info("lock: enter sub routine", zap.String("token", rl.token)) + for { msg, err := sub.Receive() if err != nil { - rl.logger.Info("sub receive message", zap.Error(err)) - break LOOP + rl.logger.Info("sub receive message failed", zap.Error(err)) + continue } select { case <-rl.exit: - break LOOP + break default: - if len(out) > 0 { - // if channel hava msg. drop it - rl.logger.Debug("drop message when channel if full") - continue - } - switch msg.(type) { case *redis.Subscription: // Ignore. @@ -208,35 +125,44 @@ LOOP: } } } - rl.logger.Debug("lock sub routine release", zap.String("token", rl.token)) } +/* +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +*/ func (rl *redissionLocker) refreshLockTimeout() { - rl.logger.Debug("lock", zap.String("token", rl.token), zap.String("lock key", rl.key)) - lockTime := time.Duration(rl.lockLeaseTime/3) * time.Millisecond + rl.logger.Info("lock refresh by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + + lockTime := time.Duration(rl.lockLeaseTime/3) * time.Second timer := time.NewTimer(lockTime) defer timer.Stop() -LOOP: + for { select { case <-timer.C: - timer.Reset(lockTime) - // update key expire time - res := rl.client.Eval(refreshLockScript, []string{rl.key}, rl.lockLeaseTime, rl.token) + // extend key lease time + res := rl.client.Eval(luascript.RefreshLockScript, []string{rl.key}, rl.lockLeaseTime, rl.token) val, err := res.Int() - if err != nil { - panic(err) + if err != redis.Nil && err != nil { + rl.logger.Info("lock refresh failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) + return } - if val == 0 { - rl.logger.Debug("not find the lock key of self") - break LOOP - } - case <-rl.exit: - break LOOP + if constant.RedisCode(val) == constant.RefreshLockFailure { + rl.logger.Error("lock refreash failed,can not find the lock by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + break + } + + if constant.RedisCode(val) == constant.RefreshLockSuccess { + rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + } + timer.Reset(lockTime) + case <-rl.exit: + break } } - rl.logger.Debug("refresh routine release", zap.String("token", rl.token)) } func (rl *redissionLocker) cancelRefreshLockTime() { @@ -246,53 +172,72 @@ func (rl *redissionLocker) cancelRefreshLockTime() { } } -func (rl *redissionLocker) tryLock() (time.Duration, error) { - res := rl.client.Eval(lockScript, []string{rl.key}, rl.lockLeaseTime, rl.token) - v, err := res.Result() +/* +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +*/ +func (rl *redissionLocker) tryLock() error { + lockType := constant.LockType + res := rl.client.Eval(luascript.LockScript, []string{rl.key}, rl.lockLeaseTime, rl.token) + val, err := res.Int() if err != redis.Nil && err != nil { - return 0, err + return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error()) } - - if v == nil { - return 0, nil - } - - return time.Duration(v.(int64)), nil + return constant.NewRedisResult(constant.RedisCode(val), lockType, "") } -func (rl *redissionLocker) UnLock() { - res := rl.client.Eval(unlockScript, []string{rl.key, rl.waitChanKey}, unlockMessage, rl.lockLeaseTime, rl.token) - val, err := res.Result() +/* +KEYS[1]:锁的键名(key),通常是锁的唯一标识。 +KEYS[2]:锁的释放通知频道(chankey),用于通知其他客户端锁已释放。 +ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。 +ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 +*/ +func (rl *redissionLocker) UnLock() error { + res := rl.client.Eval(luascript.UnLockScript, []string{rl.key, rl.waitChanKey}, unlockMessage, rl.token) + val, err := res.Int() if err != redis.Nil && err != nil { - panic(err) + rl.logger.Info("unlock lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) + return fmt.Errorf("unlock lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnLockType, err.Error())) } - if val == nil { - panic("attempt to unlock lock, not locked by current routine by lock id:" + rl.token) + + if constant.RedisCode(val) == constant.UnLockSuccess { + if rl.needRefresh { + rl.cancelRefreshLockTime() + } + + rl.logger.Info("unlock lock success", zap.String("token", rl.token), zap.String("key", rl.key)) + return nil } - rl.logger.Debug("unlock", zap.String("token", rl.token), zap.String("key", rl.key)) - if val.(int64) == 1 { - rl.cancelRefreshLockTime() + + if constant.RedisCode(val) == constant.UnLocakFailureWithLockOccupancy { + rl.logger.Info("unlock lock failed", zap.String("token", rl.token), zap.String("key", rl.key)) + return fmt.Errorf("unlock lock failed:%w", constant.NewRedisResult(constant.UnLocakFailureWithLockOccupancy, constant.UnLockType, "")) } + return nil } func GetLocker(client *redis.Client, ops *RedissionLockConfig) *redissionLocker { r := &redissionLocker{ - token: uuid.New().String(), - client: client, - exit: make(chan struct{}), - once: &sync.Once{}, + token: uuid.New().String(), + needRefresh: true, + client: client, + exit: make(chan struct{}), + logger: logger.GetLoggerInstance(), } if len(ops.Prefix) <= 0 { ops.Prefix = "redission-lock" } + if len(ops.ChanPrefix) <= 0 { ops.ChanPrefix = "redission-lock-channel" } + if ops.LockLeaseTime == 0 { r.lockLeaseTime = internalLockLeaseTime } r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":") - r.waitChanKey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":") + r.waitChanKey = strings.Join([]string{ops.ChanPrefix, ops.Key, "wait"}, ":") return r } diff --git a/distributedlock/redis_rwlock.go b/distributedlock/redis_rwlock.go index 5a35133..529ff75 100644 --- a/distributedlock/redis_rwlock.go +++ b/distributedlock/redis_rwlock.go @@ -17,9 +17,7 @@ import ( type RedissionRWLocker struct { redissionLocker - writeWaitChanKey string - rwTimeoutPrefix string - needRefresh bool + rwTokenTimeoutPrefix string } func (rl *RedissionRWLocker) RLock(timeout ...time.Duration) error { @@ -43,7 +41,7 @@ func (rl *RedissionRWLocker) RLock(timeout ...time.Duration) error { subMsg := make(chan struct{}, 1) defer close(subMsg) - sub := rl.client.Subscribe(rl.writeWaitChanKey) + sub := rl.client.Subscribe(rl.waitChanKey) defer sub.Close() go rl.subscribeLock(sub, subMsg) @@ -76,13 +74,13 @@ func (rl *RedissionRWLocker) RLock(timeout ...time.Duration) error { } } } - return fmt.Errorf("lock read lock failed:%w", result) + return fmt.Errorf("lock the redis read lock failed:%w", result) } func (rl *RedissionRWLocker) tryRLock() error { lockType := constant.LockType - res := rl.client.Eval(luascript.RLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token) + res := rl.client.Eval(luascript.RLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix}, rl.lockLeaseTime, rl.token) val, err := res.Int() if err != redis.Nil && err != nil { return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error()) @@ -91,7 +89,7 @@ func (rl *RedissionRWLocker) tryRLock() error { } func (rl *RedissionRWLocker) refreshLockTimeout() { - rl.logger.Info("read lock refresh by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + rl.logger.Info("lock refresh by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) lockTime := time.Duration(rl.lockLeaseTime/3) * time.Second timer := time.NewTimer(lockTime) @@ -101,20 +99,20 @@ func (rl *RedissionRWLocker) refreshLockTimeout() { select { case <-timer.C: // extend key lease time - res := rl.client.Eval(luascript.RefreshLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token) + res := rl.client.Eval(luascript.RefreshRWLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix}, rl.lockLeaseTime, rl.token) val, err := res.Int() if err != redis.Nil && err != nil { - rl.logger.Info("read lock refresh failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) + rl.logger.Info("lock refresh failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) return } if constant.RedisCode(val) == constant.RefreshLockFailure { - rl.logger.Error("read lock refreash failed,can not find the read lock by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + rl.logger.Error("lock refreash failed,can not find the read lock by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) break } if constant.RedisCode(val) == constant.RefreshLockSuccess { - rl.logger.Info("read lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) + rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) } timer.Reset(lockTime) case <-rl.exit: @@ -124,11 +122,11 @@ func (rl *RedissionRWLocker) refreshLockTimeout() { } func (rl *RedissionRWLocker) UnRLock() error { - res := rl.client.Eval(luascript.UnRLockScript, []string{rl.key, rl.rwTimeoutPrefix, rl.writeWaitChanKey}, unlockMessage, rl.token) + res := rl.client.Eval(luascript.UnRLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token) val, err := res.Int() if err != redis.Nil && err != nil { rl.logger.Info("unlock read lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) - return fmt.Errorf("unlock read lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.LockType, err.Error())) + return fmt.Errorf("unlock read lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnLockType, err.Error())) } if (constant.RedisCode(val) == constant.UnLockSuccess) || (constant.RedisCode(val) == constant.UnRLockSuccess) { @@ -168,7 +166,7 @@ func (rl *RedissionRWLocker) WLock(timeout ...time.Duration) error { subMsg := make(chan struct{}, 1) defer close(subMsg) - sub := rl.client.Subscribe(rl.writeWaitChanKey) + sub := rl.client.Subscribe(rl.waitChanKey) defer sub.Close() go rl.subscribeLock(sub, subMsg) @@ -206,7 +204,7 @@ func (rl *RedissionRWLocker) WLock(timeout ...time.Duration) error { func (rl *RedissionRWLocker) tryWLock() error { lockType := constant.LockType - res := rl.client.Eval(luascript.WLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token) + res := rl.client.Eval(luascript.WLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix}, rl.lockLeaseTime, rl.token) val, err := res.Int() if err != redis.Nil && err != nil { return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error()) @@ -215,7 +213,7 @@ func (rl *RedissionRWLocker) tryWLock() error { } func (rl *RedissionRWLocker) UnWLock() error { - res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.rwTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token) + res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token) val, err := res.Int() if err != redis.Nil && err != nil { rl.logger.Info("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) @@ -239,16 +237,21 @@ func (rl *RedissionRWLocker) UnWLock() error { func GetRWLocker(client *redis.Client, ops *RedissionLockConfig) *RedissionRWLocker { r := &redissionLocker{ - token: uuid.New().String(), - client: client, - exit: make(chan struct{}), - logger: logger.GetLoggerInstance(), + token: uuid.New().String(), + needRefresh: true, + client: client, + exit: make(chan struct{}), + logger: logger.GetLoggerInstance(), } if len(ops.Prefix) <= 0 { ops.Prefix = "redission-rwlock" } + if len(ops.TimeoutPrefix) <= 0 { + ops.TimeoutPrefix = "rwlock_timeout" + } + if len(ops.ChanPrefix) <= 0 { ops.ChanPrefix = "redission-rwlock-channel" } @@ -256,13 +259,13 @@ func GetRWLocker(client *redis.Client, ops *RedissionLockConfig) *RedissionRWLoc if ops.LockLeaseTime == 0 { r.lockLeaseTime = internalLockLeaseTime } + r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":") + r.waitChanKey = strings.Join([]string{ops.ChanPrefix, ops.Key, "write"}, ":") rwLocker := &RedissionRWLocker{ - redissionLocker: *r, - writeWaitChanKey: strings.Join([]string{r.key, "write"}, ":"), - rwTimeoutPrefix: "rwlock_timeout", - needRefresh: true, + redissionLocker: *r, + rwTokenTimeoutPrefix: ops.TimeoutPrefix, } return rwLocker }