optimize structer of redisRWLock and acquisition statements of write lock
This commit is contained in:
parent
c08f4b91f5
commit
09225fc96f
|
|
@ -1,23 +1,25 @@
|
|||
package constant
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type RedisResult int
|
||||
type RedisCode int
|
||||
|
||||
const (
|
||||
LockSuccess = RedisResult(1)
|
||||
UnLockSuccess = RedisResult(1)
|
||||
RefreshLockSuccess = RedisResult(1)
|
||||
UnRLockSuccess = RedisResult(0)
|
||||
RLockFailure = RedisResult(-1)
|
||||
UnRLockFailureWithWLockOccupancy = RedisResult(-2)
|
||||
WLockFailureWithRLockOccupancy = RedisResult(-3)
|
||||
WLockFailureWithWLockOccupancy = RedisResult(-4)
|
||||
UnWLockFailureWithRLockOccupancy = RedisResult(-5)
|
||||
UnWLockFailureWithWLockOccupancy = RedisResult(-6)
|
||||
WLockFailureWithNotFirstPriority = RedisResult(-7)
|
||||
RefreshLockFailure = RedisResult(-8)
|
||||
UnknownInternalError = RedisResult(-99)
|
||||
LockSuccess = RedisCode(1)
|
||||
UnLockSuccess = RedisCode(1)
|
||||
RefreshLockSuccess = RedisCode(1)
|
||||
UnRLockSuccess = RedisCode(0)
|
||||
RLockFailureWithWLockOccupancy = RedisCode(-1)
|
||||
UnRLockFailureWithWLockOccupancy = RedisCode(-2)
|
||||
WLockFailureWithRLockOccupancy = RedisCode(-3)
|
||||
WLockFailureWithWLockOccupancy = RedisCode(-4)
|
||||
UnWLockFailureWithRLockOccupancy = RedisCode(-5)
|
||||
UnWLockFailureWithWLockOccupancy = RedisCode(-6)
|
||||
WLockFailureWithNotFirstPriority = RedisCode(-7)
|
||||
RefreshLockFailure = RedisCode(-8)
|
||||
UnknownInternalError = RedisCode(-99)
|
||||
)
|
||||
|
||||
type RedisLockType int
|
||||
|
|
@ -28,68 +30,64 @@ const (
|
|||
RefreshLockType
|
||||
)
|
||||
|
||||
type RedisError struct {
|
||||
Code RedisResult
|
||||
type RedisResult struct {
|
||||
Code RedisCode
|
||||
Message string
|
||||
}
|
||||
|
||||
func (e *RedisError) Error() string {
|
||||
func (e *RedisResult) Error() string {
|
||||
return fmt.Sprintf("redis execution code:%d,message:%s\n", e.Code, e.Message)
|
||||
}
|
||||
|
||||
func (e *RedisError) OutputResultMessage() string {
|
||||
func (e *RedisResult) OutputResultMessage() string {
|
||||
return e.Message
|
||||
}
|
||||
|
||||
func (e *RedisError) OutputResultCode() int {
|
||||
func (e *RedisResult) OutputResultCode() int {
|
||||
return int(e.Code)
|
||||
}
|
||||
|
||||
func NewRedisError(res RedisResult) error {
|
||||
resInt := int(res)
|
||||
switch resInt {
|
||||
case -1:
|
||||
return &RedisError{Code: -1, Message: "redis lock read lock failure,the lock is already occupied by another processes write lock"}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func ConvertResultToErr(res RedisResult, lockType RedisLockType, redisMsg string) error {
|
||||
func NewRedisResult(res RedisCode, lockType RedisLockType, redisMsg string) error {
|
||||
resInt := int(res)
|
||||
switch resInt {
|
||||
case 1:
|
||||
if lockType == LockType {
|
||||
return &RedisError{Code: res, Message: "redis lock success"}
|
||||
return &RedisResult{Code: res, Message: "redis lock success"}
|
||||
} else if lockType == UnLockType {
|
||||
return &RedisError{Code: res, Message: "redis unlock success"}
|
||||
return &RedisResult{Code: res, Message: "redis unlock success"}
|
||||
} else {
|
||||
return &RedisError{Code: res, Message: "redis refresh lock success"}
|
||||
return &RedisResult{Code: res, Message: "redis refresh lock success"}
|
||||
}
|
||||
case 0:
|
||||
return &RedisError{Code: res, Message: "redis unlock read lock success, the lock is still occupied by other processes read lock"}
|
||||
return &RedisResult{Code: res, Message: "redis unlock read lock success, the lock is still occupied by other processes read lock"}
|
||||
case -1:
|
||||
return &RedisError{Code: res, Message: "redis lock read lock failure,the lock is already occupied by another processes write lock"}
|
||||
return &RedisResult{Code: res, Message: "redis lock read lock failure,the lock is already occupied by another processes write lock"}
|
||||
case -2:
|
||||
return &RedisError{Code: res, Message: "redis un lock read lock failure,the lock is already occupied by another processes write lock"}
|
||||
return &RedisResult{Code: res, Message: "redis un lock read lock failure,the lock is already occupied by another processes write lock"}
|
||||
case -3:
|
||||
return &RedisError{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes read lock"}
|
||||
return &RedisResult{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes read lock"}
|
||||
case -4:
|
||||
return &RedisError{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes write lock"}
|
||||
return &RedisResult{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes write lock"}
|
||||
case -5:
|
||||
return &RedisError{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes read lock"}
|
||||
return &RedisResult{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes read lock"}
|
||||
case -6:
|
||||
return &RedisError{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes write lock"}
|
||||
return &RedisResult{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes write lock"}
|
||||
case -7:
|
||||
return &RedisError{Code: res, Message: "redis lock write lock failure,the first priority in the current process non-waiting queue"}
|
||||
return &RedisResult{Code: res, Message: "redis lock write lock failure,the first priority in the current process non-waiting queue"}
|
||||
case -8:
|
||||
return &RedisError{Code: res, Message: "redis refresh lock failure,the lock not exist"}
|
||||
return &RedisResult{Code: res, Message: "redis refresh lock failure,the lock not exist"}
|
||||
case -99:
|
||||
return &RedisResult{Code: res, Message: "redis internal execution error"}
|
||||
default:
|
||||
return &RedisError{Code: res, Message: fmt.Sprintf("unkown redis execution result:%s\n", redisMsg)}
|
||||
msg := "unkown redis execution result"
|
||||
if redisMsg != "" {
|
||||
msg = fmt.Sprintf("%s:%s\n", msg, redisMsg)
|
||||
}
|
||||
return &RedisResult{Code: res, Message: msg}
|
||||
}
|
||||
}
|
||||
|
||||
func TranslateResultToStr(res RedisResult, lockType RedisLockType) string {
|
||||
func TranslateResultToStr(res RedisCode, lockType RedisLockType) string {
|
||||
resInt := int(res)
|
||||
switch resInt {
|
||||
case 1:
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@ if (mode == false) then
|
|||
end;
|
||||
|
||||
if (mode == 'write') then
|
||||
-- TODO 放到 list 中等待写锁释放后再次尝试加锁并且订阅写锁释放的消息
|
||||
local key = KEYS[1] .. ':read';
|
||||
redis.call('rpush', key, ARGV[2]);
|
||||
-- 放到 list 中等待写锁释放后再次尝试加锁并且订阅写锁释放的消息
|
||||
local waitKey = KEYS[1] .. ':read';
|
||||
redis.call('rpush', waitKey, ARGV[2]);
|
||||
return -1;
|
||||
end;
|
||||
|
||||
|
|
@ -60,8 +60,7 @@ end;
|
|||
/*
|
||||
KEYS[1]:锁的键名(key),通常是锁的唯一标识。
|
||||
KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。
|
||||
KEYS[3]:锁的释放通知读频道(chankey),用于通知其他客户端锁已释放。
|
||||
KEYS[4]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。
|
||||
KEYS[3]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。
|
||||
ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。
|
||||
ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。
|
||||
*/
|
||||
|
|
@ -74,8 +73,6 @@ if (mode == false) then
|
|||
local counter = redis.call('llen',writeWait)
|
||||
if (counter >= 1) then
|
||||
redis.call('publish', KEYS[4], ARGV[1]);
|
||||
else
|
||||
redis.call('publish', KEYS[3], ARGV[1]);
|
||||
end;
|
||||
return 1;
|
||||
elseif (mode == 'write') then
|
||||
|
|
@ -150,7 +147,7 @@ if (mode == false) then
|
|||
redis.call('lpop', waitKey, '1')
|
||||
return 1;
|
||||
elseif (mode == 'read') then
|
||||
-- TODO 放到 list 中等待读锁释放后再次尝试加锁并且订阅读锁释放的消息
|
||||
-- 放到 list 中等待读锁释放后再次尝试加锁并且订阅读锁释放的消息
|
||||
redis.call('rpush', waitkey, ARGV[2]);
|
||||
return -3;
|
||||
else
|
||||
|
|
@ -174,8 +171,7 @@ end;
|
|||
/*
|
||||
KEYS[1]:锁的键名(key),通常是锁的唯一标识。
|
||||
KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。
|
||||
KEYS[3]:锁的释放通知读频道(chankey),用于通知其他客户端锁已释放。
|
||||
KEYS[4]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。
|
||||
KEYS[3]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。
|
||||
ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。
|
||||
ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。
|
||||
*/
|
||||
|
|
@ -186,8 +182,6 @@ if (mode == false) then
|
|||
-- 优先写锁加锁,无写锁的情况通知读锁加锁
|
||||
local counter = redis.call('llen',writeWait)
|
||||
if (counter >= 1) then
|
||||
redis.call('publish', KEYS[4], ARGV[1]);
|
||||
else
|
||||
redis.call('publish', KEYS[3], ARGV[1]);
|
||||
end;
|
||||
return 1;
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ var unlockScript string = strings.Join([]string{
|
|||
}, "")
|
||||
|
||||
const (
|
||||
internalLockLeaseTime = uint64(30) * 1000
|
||||
internalLockLeaseTime = uint64(30)
|
||||
unlockMessage = 0
|
||||
)
|
||||
|
||||
|
|
@ -71,7 +71,7 @@ type RedissionLockConfig struct {
|
|||
type redissionLocker struct {
|
||||
token string
|
||||
key string
|
||||
waitChankey string
|
||||
waitChanKey string
|
||||
exit chan struct{}
|
||||
lockLeaseTime uint64
|
||||
client *redis.Client
|
||||
|
|
@ -98,7 +98,7 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) {
|
|||
|
||||
submsg := make(chan struct{}, 1)
|
||||
defer close(submsg)
|
||||
sub := rl.client.Subscribe(rl.waitChankey)
|
||||
sub := rl.client.Subscribe(rl.waitChanKey)
|
||||
defer sub.Close()
|
||||
go rl.subscribeLock(sub, submsg)
|
||||
// listen := rl.listenManager.Subscribe(rl.key, rl.token)
|
||||
|
|
@ -242,7 +242,6 @@ LOOP:
|
|||
func (rl *redissionLocker) cancelRefreshLockTime() {
|
||||
if rl.exit != nil {
|
||||
close(rl.exit)
|
||||
rl.exit = nil
|
||||
rl.once = &sync.Once{}
|
||||
}
|
||||
}
|
||||
|
|
@ -262,7 +261,7 @@ func (rl *redissionLocker) tryLock() (time.Duration, error) {
|
|||
}
|
||||
|
||||
func (rl *redissionLocker) UnLock() {
|
||||
res := rl.client.Eval(unlockScript, []string{rl.key, rl.waitChankey}, unlockMessage, rl.lockLeaseTime, rl.token)
|
||||
res := rl.client.Eval(unlockScript, []string{rl.key, rl.waitChanKey}, unlockMessage, rl.lockLeaseTime, rl.token)
|
||||
val, err := res.Result()
|
||||
if err != redis.Nil && err != nil {
|
||||
panic(err)
|
||||
|
|
@ -294,6 +293,6 @@ func GetLocker(client *redis.Client, ops *RedissionLockConfig) *redissionLocker
|
|||
r.lockLeaseTime = internalLockLeaseTime
|
||||
}
|
||||
r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":")
|
||||
r.waitChankey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":")
|
||||
r.waitChanKey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":")
|
||||
return r
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,9 @@
|
|||
package distributed_lock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"modelRT/distributedlock/constant"
|
||||
|
|
@ -17,26 +15,25 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type redissionReadLocker struct {
|
||||
type RedissionRWLocker struct {
|
||||
redissionLocker
|
||||
writeWaitChanKey string
|
||||
rwTimeoutPrefix string
|
||||
prefixKey string
|
||||
needRefresh bool
|
||||
}
|
||||
|
||||
// TODO 将参数中的 ctx 优化掉
|
||||
func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duration) error {
|
||||
func (rl *RedissionRWLocker) RLock(timeout ...time.Duration) error {
|
||||
if rl.exit == nil {
|
||||
rl.exit = make(chan struct{})
|
||||
}
|
||||
|
||||
resultErr := rl.tryLock().(*constant.RedisError)
|
||||
if resultErr.Code == constant.UnknownInternalError {
|
||||
rl.logger.Error(resultErr.OutputResultMessage())
|
||||
return fmt.Errorf("get read lock failed:%w", resultErr)
|
||||
result := rl.tryRLock().(*constant.RedisResult)
|
||||
if result.Code == constant.UnknownInternalError {
|
||||
rl.logger.Error(result.OutputResultMessage())
|
||||
return fmt.Errorf("get read lock failed:%w", result)
|
||||
}
|
||||
|
||||
if (resultErr.Code == constant.LockSuccess) && rl.needRefresh {
|
||||
if (result.Code == constant.LockSuccess) && rl.needRefresh {
|
||||
rl.once.Do(func() {
|
||||
// async refresh lock timeout unitl receive exit singal
|
||||
go rl.refreshLockTimeout()
|
||||
|
|
@ -44,21 +41,17 @@ func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duratio
|
|||
return nil
|
||||
}
|
||||
|
||||
var acquireTimer *time.Timer
|
||||
if len(timeout) > 0 && timeout[0] > 0 {
|
||||
acquireTimer = time.NewTimer(timeout[0])
|
||||
}
|
||||
|
||||
subMsg := make(chan struct{}, 1)
|
||||
defer close(subMsg)
|
||||
sub := rl.client.Subscribe(rl.waitChankey)
|
||||
sub := rl.client.Subscribe(rl.writeWaitChanKey)
|
||||
defer sub.Close()
|
||||
go rl.subscribeLock(sub, subMsg)
|
||||
|
||||
if len(timeout) > 0 && timeout[0] > 0 {
|
||||
acquireTimer = time.NewTimer(timeout[0])
|
||||
acquireTimer := time.NewTimer(timeout[0])
|
||||
for {
|
||||
select {
|
||||
|
||||
case _, ok := <-subMsg:
|
||||
if !ok {
|
||||
err := errors.New("failed to read the read lock waiting for for the channel message")
|
||||
|
|
@ -66,8 +59,8 @@ func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duratio
|
|||
return err
|
||||
}
|
||||
|
||||
resultErr := rl.tryLock().(*constant.RedisError)
|
||||
if (resultErr.Code == constant.RLockFailure) || (resultErr.Code == constant.UnknownInternalError) {
|
||||
resultErr := rl.tryRLock().(*constant.RedisResult)
|
||||
if (resultErr.Code == constant.RLockFailureWithWLockOccupancy) || (resultErr.Code == constant.UnknownInternalError) {
|
||||
rl.logger.Info(resultErr.OutputResultMessage())
|
||||
continue
|
||||
}
|
||||
|
|
@ -83,229 +76,193 @@ func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duratio
|
|||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("get read lock failed:%w", constant.NewRedisError(constant.RLockFailure))
|
||||
return fmt.Errorf("lock read lock failed:%w", result)
|
||||
}
|
||||
|
||||
func (rl *redissionReadLocker) tryLock() error {
|
||||
func (rl *RedissionRWLocker) tryRLock() error {
|
||||
lockType := constant.LockType
|
||||
|
||||
res := rl.client.Eval(luascript.RLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token)
|
||||
v, err := res.Result()
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
return constant.ConvertResultToErr(constant.UnknownInternalError, lockType, err.Error())
|
||||
return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error())
|
||||
}
|
||||
return constant.ConvertResultToErr(v.(constant.RedisResult), lockType, "")
|
||||
return constant.NewRedisResult(constant.RedisCode(val), lockType, "")
|
||||
}
|
||||
|
||||
func (rl *redissionReadLocker) refreshLockTimeout() {
|
||||
rl.logger.Debug("rlock: %s lock %s\n", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
lockTime := time.Duration(rl.lockLeaseTime/3) * time.Millisecond
|
||||
func (rl *RedissionRWLocker) refreshLockTimeout() {
|
||||
rl.logger.Info("read lock refresh by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
|
||||
lockTime := time.Duration(rl.lockLeaseTime/3) * time.Second
|
||||
timer := time.NewTimer(lockTime)
|
||||
defer timer.Stop()
|
||||
LOOP:
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
timer.Reset(lockTime)
|
||||
// update key expire time
|
||||
res := rl.client.Eval(luascript.RefreshLockScript, []string{rl.key, rl.prefixKey}, rl.lockLeaseTime, rl.token)
|
||||
// extend key lease time
|
||||
res := rl.client.Eval(luascript.RefreshLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token)
|
||||
val, err := res.Int()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if val == 0 {
|
||||
rl.logger.Debug("not find the rlock key of self")
|
||||
break LOOP
|
||||
}
|
||||
case <-rl.exit:
|
||||
break LOOP
|
||||
|
||||
}
|
||||
}
|
||||
rl.logger.Debug("rlock: refresh routine release", zap.String("token", rl.token))
|
||||
}
|
||||
|
||||
func (rl *redissionReadLocker) UnLock() {
|
||||
res := rl.client.Eval(luascript.UnRLockScript, []string{rl.key, rl.waitChankey, rl.rwTimeoutPrefix, rl.prefixKey}, unlockMessage, rl.token)
|
||||
val, err := res.Result()
|
||||
if err != redis.Nil && err != nil {
|
||||
panic(err)
|
||||
rl.logger.Info("read lock refresh failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
if val == nil {
|
||||
panic("attempt to unlock lock, not locked by current routine by lock id:" + rl.token)
|
||||
|
||||
if constant.RedisCode(val) == constant.RefreshLockFailure {
|
||||
rl.logger.Error("read lock refreash failed,can not find the read lock by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
break
|
||||
}
|
||||
rl.logger.Debug("lock: %s unlock %s\n", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
if val.(int64) == 1 {
|
||||
|
||||
if constant.RedisCode(val) == constant.RefreshLockSuccess {
|
||||
rl.logger.Info("read lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
}
|
||||
timer.Reset(lockTime)
|
||||
case <-rl.exit:
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rl *RedissionRWLocker) UnRLock() error {
|
||||
res := rl.client.Eval(luascript.UnRLockScript, []string{rl.key, rl.rwTimeoutPrefix, rl.writeWaitChanKey}, unlockMessage, rl.token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
rl.logger.Info("unlock read lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
return fmt.Errorf("unlock read lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.LockType, err.Error()))
|
||||
}
|
||||
|
||||
if (constant.RedisCode(val) == constant.UnLockSuccess) || (constant.RedisCode(val) == constant.UnRLockSuccess) {
|
||||
if rl.needRefresh {
|
||||
rl.cancelRefreshLockTime()
|
||||
}
|
||||
|
||||
rl.logger.Info("unlock read lock success", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
return nil
|
||||
}
|
||||
|
||||
type redissionWriteLocker struct {
|
||||
redissionLocker
|
||||
if constant.RedisCode(val) == constant.UnRLockFailureWithWLockOccupancy {
|
||||
rl.logger.Info("unlock read lock failed", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
return fmt.Errorf("unlock read lock failed:%w", constant.NewRedisResult(constant.UnRLockFailureWithWLockOccupancy, constant.UnLockType, ""))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rl *redissionWriteLocker) Lock(ctx context.Context, timeout ...time.Duration) {
|
||||
func (rl *RedissionRWLocker) WLock(timeout ...time.Duration) error {
|
||||
if rl.exit == nil {
|
||||
rl.exit = make(chan struct{})
|
||||
}
|
||||
ttl, err := rl.tryLock()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
result := rl.tryWLock().(*constant.RedisResult)
|
||||
if result.Code == constant.UnknownInternalError {
|
||||
rl.logger.Error(result.OutputResultMessage())
|
||||
return fmt.Errorf("get write lock failed:%w", result)
|
||||
}
|
||||
|
||||
if ttl <= 0 {
|
||||
if (result.Code == constant.LockSuccess) && rl.needRefresh {
|
||||
rl.once.Do(func() {
|
||||
// async refresh lock timeout unitl receive exit singal
|
||||
go rl.refreshLockTimeout()
|
||||
})
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
submsg := make(chan struct{}, 1)
|
||||
defer close(submsg)
|
||||
sub := rl.client.Subscribe(rl.waitChankey)
|
||||
subMsg := make(chan struct{}, 1)
|
||||
defer close(subMsg)
|
||||
sub := rl.client.Subscribe(rl.writeWaitChanKey)
|
||||
defer sub.Close()
|
||||
go rl.subscribeLock(sub, submsg)
|
||||
// listen := rl.listenManager.Subscribe(rl.key, rl.token)
|
||||
// defer rl.listenManager.UnSubscribe(rl.key, rl.token)
|
||||
go rl.subscribeLock(sub, subMsg)
|
||||
|
||||
timer := time.NewTimer(ttl)
|
||||
defer timer.Stop()
|
||||
// outimer 理解为如果超过这个时间没有获取到锁,就直接放弃
|
||||
var outimer *time.Timer
|
||||
if len(timeout) > 0 && timeout[0] > 0 {
|
||||
outimer = time.NewTimer(timeout[0])
|
||||
}
|
||||
LOOP:
|
||||
acquireTimer := time.NewTimer(timeout[0])
|
||||
for {
|
||||
ttl, err = rl.tryLock()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if ttl <= 0 {
|
||||
rl.once.Do(func() {
|
||||
go rl.refreshLockTimeout()
|
||||
})
|
||||
return
|
||||
}
|
||||
if outimer != nil {
|
||||
select {
|
||||
case _, ok := <-submsg:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
|
||||
case _, ok := <-subMsg:
|
||||
if !ok {
|
||||
panic("lock listen release")
|
||||
err := errors.New("failed to read the write lock waiting for for the channel message")
|
||||
rl.logger.Error("failed to read the read lock waiting for for the channel message")
|
||||
return err
|
||||
}
|
||||
|
||||
timer.Reset(ttl)
|
||||
case <-ctx.Done():
|
||||
// break LOOP
|
||||
panic("lock context already release")
|
||||
case <-timer.C:
|
||||
timer.Reset(ttl)
|
||||
case <-outimer.C:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
break LOOP
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case _, ok := <-submsg:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
result := rl.tryWLock().(*constant.RedisResult)
|
||||
if (result.Code == constant.UnknownInternalError) || (result.Code == constant.WLockFailureWithRLockOccupancy) || (result.Code == constant.WLockFailureWithWLockOccupancy) || (result.Code == constant.WLockFailureWithNotFirstPriority) {
|
||||
rl.logger.Info(result.OutputResultMessage())
|
||||
continue
|
||||
}
|
||||
|
||||
if !ok {
|
||||
panic("lock listen release")
|
||||
if result.Code == constant.LockSuccess {
|
||||
rl.logger.Info(result.OutputResultMessage())
|
||||
return nil
|
||||
}
|
||||
case <-acquireTimer.C:
|
||||
err := errors.New("the waiting time for obtaining the write lock operation has timed out")
|
||||
rl.logger.Info("the waiting time for obtaining the write lock operation has timed out")
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("lock write lock failed:%w", result)
|
||||
}
|
||||
|
||||
timer.Reset(ttl)
|
||||
case <-ctx.Done():
|
||||
// break LOOP
|
||||
panic("lock context already release")
|
||||
case <-timer.C:
|
||||
timer.Reset(ttl)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func (rl *RedissionRWLocker) tryWLock() error {
|
||||
lockType := constant.LockType
|
||||
|
||||
func (rl *redissionWriteLocker) tryLock() (time.Duration, error) {
|
||||
res := rl.client.Eval(luascript.WLockScript, []string{rl.key}, rl.lockLeaseTime, rl.token)
|
||||
v, err := res.Result()
|
||||
res := rl.client.Eval(luascript.WLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
return 0, err
|
||||
return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error())
|
||||
}
|
||||
return constant.NewRedisResult(constant.RedisCode(val), lockType, "")
|
||||
}
|
||||
|
||||
if v == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return time.Duration(v.(int64)), nil
|
||||
}
|
||||
|
||||
func (rl *redissionWriteLocker) UnLock() {
|
||||
res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.waitChankey}, unlockMessage, rl.lockLeaseTime, rl.token)
|
||||
val, err := res.Result()
|
||||
func (rl *RedissionRWLocker) UnWLock() error {
|
||||
res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.rwTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
panic(err)
|
||||
rl.logger.Info("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
return fmt.Errorf("unlock write lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnLockType, err.Error()))
|
||||
}
|
||||
if val == nil {
|
||||
panic("attempt to unlock lock, not locked by current routine by lock id:" + rl.token)
|
||||
}
|
||||
rl.logger.Debug("lock: unlock", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
if val.(int64) == 1 {
|
||||
|
||||
if constant.RedisCode(val) == constant.UnLockSuccess {
|
||||
if rl.needRefresh {
|
||||
rl.cancelRefreshLockTime()
|
||||
}
|
||||
rl.logger.Info("unlock write lock success", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetReadLocker(client *redis.Client, ops *RedissionLockConfig) *redissionReadLocker {
|
||||
if (constant.RedisCode(val) == constant.UnWLockFailureWithRLockOccupancy) || (constant.RedisCode(val) == constant.UnWLockFailureWithWLockOccupancy) {
|
||||
rl.logger.Info("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
return fmt.Errorf("unlock write lock failed:%w", constant.NewRedisResult(constant.RedisCode(val), constant.UnLockType, ""))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetRWLocker(client *redis.Client, ops *RedissionLockConfig) *RedissionRWLocker {
|
||||
r := &redissionLocker{
|
||||
token: uuid.New().String(),
|
||||
client: client,
|
||||
exit: make(chan struct{}),
|
||||
once: &sync.Once{},
|
||||
}
|
||||
|
||||
if len(ops.Prefix) <= 0 {
|
||||
ops.Prefix = "redission-rwlock"
|
||||
}
|
||||
|
||||
if len(ops.ChanPrefix) <= 0 {
|
||||
ops.ChanPrefix = "redission-rwlock-channel"
|
||||
}
|
||||
|
||||
if ops.LockLeaseTime == 0 {
|
||||
r.lockLeaseTime = internalLockLeaseTime
|
||||
}
|
||||
r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":")
|
||||
r.waitChankey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":")
|
||||
tkey := strings.Join([]string{"{", r.key, "}"}, "")
|
||||
return &redissionReadLocker{redissionLocker: *r, rwTimeoutPrefix: strings.Join([]string{tkey, r.token, "rwlock_timeout"}, ":"), prefixKey: tkey, needRefresh: true}
|
||||
}
|
||||
|
||||
func GetWriteLocker(client *redis.Client, ops *RedissionLockConfig) *redissionWriteLocker {
|
||||
r := &redissionLocker{
|
||||
token: uuid.New().String(),
|
||||
client: client,
|
||||
exit: make(chan struct{}),
|
||||
once: &sync.Once{},
|
||||
logger: logger.GetLoggerInstance(),
|
||||
}
|
||||
|
||||
if len(ops.Prefix) <= 0 {
|
||||
ops.Prefix = "redission-rwlock"
|
||||
}
|
||||
|
||||
if len(ops.ChanPrefix) <= 0 {
|
||||
ops.ChanPrefix = "redission-rwlock-channel"
|
||||
}
|
||||
|
||||
if ops.LockLeaseTime == 0 {
|
||||
r.lockLeaseTime = internalLockLeaseTime
|
||||
}
|
||||
r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":")
|
||||
r.waitChankey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":")
|
||||
return &redissionWriteLocker{redissionLocker: *r}
|
||||
|
||||
rwLocker := &RedissionRWLocker{
|
||||
redissionLocker: *r,
|
||||
writeWaitChanKey: strings.Join([]string{r.key, "write"}, ":"),
|
||||
rwTimeoutPrefix: "rwlock_timeout",
|
||||
needRefresh: true,
|
||||
}
|
||||
return rwLocker
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue