Compare commits
2 Commits
d27a9bbafa
...
23110cbba9
| Author | SHA1 | Date |
|---|---|---|
|
|
23110cbba9 | |
|
|
310f4c043c |
|
|
@ -14,8 +14,8 @@ local lockKey = KEYS[2] .. ':' .. ARGV[2];
|
|||
if (mode == false) then
|
||||
redis.call('hset', KEYS[1], 'mode', 'read');
|
||||
redis.call('hset', KEYS[1], lockKey, '1');
|
||||
redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
redis.call('expire', KEYS[1], ARGV[1]);
|
||||
redis.call('hpexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
redis.call('pexpire', KEYS[1], ARGV[1]);
|
||||
return 1;
|
||||
end;
|
||||
|
||||
|
|
@ -29,11 +29,11 @@ end;
|
|||
if (mode == 'read') then
|
||||
if (redis.call('exists', KEYS[1], ARGV[2]) == 1) then
|
||||
redis.call('hincrby', KEYS[1], lockKey, '1');
|
||||
local remainTime = redis.call('httl', KEYS[1], 'fields', '1', lockKey);
|
||||
redis.call('hexpire', KEYS[1], math.max(tonumber(remainTime[1]), ARGV[1]), 'fields', '1', lockKey);
|
||||
local remainTime = redis.call('hpttl', KEYS[1], 'fields', '1', lockKey);
|
||||
redis.call('hpexpire', KEYS[1], math.max(tonumber(remainTime[1]), ARGV[1]), 'fields', '1', lockKey);
|
||||
else
|
||||
redis.call('hset', KEYS[1], lockKey, '1');
|
||||
redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
redis.call('hpexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
end;
|
||||
local cursor = 0;
|
||||
local maxRemainTime = tonumber(ARGV[1]);
|
||||
|
|
@ -45,13 +45,13 @@ if (mode == 'read') then
|
|||
|
||||
for i = 1, #fields,2 do
|
||||
local field = fields[i];
|
||||
local remainTime = redis.call('httl', KEYS[1], 'fields', '1', field);
|
||||
local remainTime = redis.call('hpttl', KEYS[1], 'fields', '1', field);
|
||||
maxRemainTime = math.max(tonumber(remainTime[1]), maxRemainTime);
|
||||
end;
|
||||
until cursor == 0;
|
||||
|
||||
local remainTime = redis.call('ttl', KEYS[1]);
|
||||
redis.call('expire', KEYS[1], math.max(tonumber(remainTime),maxRemainTime));
|
||||
local remainTime = redis.call('pttl', KEYS[1]);
|
||||
redis.call('pexpire', KEYS[1], math.max(tonumber(remainTime),maxRemainTime));
|
||||
return 1;
|
||||
end;
|
||||
`
|
||||
|
|
@ -86,7 +86,7 @@ if ((mode == 'read') and (lockExists == 0)) then
|
|||
end;
|
||||
|
||||
local counter = redis.call('hincrby', KEYS[1], lockKey, -1);
|
||||
local delTTLs = redis.call('httl', KEYS[1], 'fields', '1', lockKey);
|
||||
local delTTLs = redis.call('hpttl', KEYS[1], 'fields', '1', lockKey);
|
||||
local delTTL = tonumber(delTTLs[1]);
|
||||
if (counter == 0) then
|
||||
redis.call('hdel', KEYS[1], lockKey);
|
||||
|
|
@ -103,17 +103,17 @@ if (redis.call('hlen', KEYS[1]) > 1) then
|
|||
|
||||
for i = 1, #fields,2 do
|
||||
local field = fields[i];
|
||||
local remainTime = redis.call('httl', KEYS[1], 'fields', '1', field);
|
||||
local remainTime = redis.call('hpttl', KEYS[1], 'fields', '1', field);
|
||||
maxRemainTime = math.max(tonumber(remainTime[1]), maxRemainTime);
|
||||
end;
|
||||
until cursor == 0;
|
||||
|
||||
if (maxRemainTime > 0) then
|
||||
if (delTTL > maxRemainTime) then
|
||||
redis.call('expire', KEYS[1], maxRemainTime);
|
||||
redis.call('pexpire', KEYS[1], maxRemainTime);
|
||||
else
|
||||
local remainTime = redis.call('ttl', KEYS[1]);
|
||||
redis.call('expire', KEYS[1], math.max(tonumber(remainTime),maxRemainTime));
|
||||
local remainTime = redis.call('pttl', KEYS[1]);
|
||||
redis.call('pexpire', KEYS[1], math.max(tonumber(remainTime),maxRemainTime));
|
||||
end;
|
||||
end;
|
||||
else
|
||||
|
|
@ -149,8 +149,8 @@ if (mode == false) then
|
|||
end;
|
||||
redis.call('hset', KEYS[1], 'mode', 'write');
|
||||
redis.call('hset', KEYS[1], lockKey, 1);
|
||||
redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
redis.call('expire', KEYS[1], ARGV[1]);
|
||||
redis.call('hpexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
redis.call('pexpire', KEYS[1], ARGV[1]);
|
||||
redis.call('lpop', waitKey, '1');
|
||||
return 1;
|
||||
elseif (mode == 'read') then
|
||||
|
|
@ -163,8 +163,8 @@ else
|
|||
local lockExists = redis.call('hexists', KEYS[1], lockKey);
|
||||
if (lockExists == 1) then
|
||||
redis.call('hincrby', KEYS[1], lockKey, 1);
|
||||
redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
redis.call('expire', KEYS[1], ARGV[1]);
|
||||
redis.call('hpexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
redis.call('pexpire', KEYS[1], ARGV[1]);
|
||||
return 1;
|
||||
end;
|
||||
-- 放到 list 中等待写锁释放后再次尝试加锁并且订阅写锁释放的消息
|
||||
|
|
@ -233,7 +233,7 @@ local lockExists = redis.call('hexists', KEYS[1], lockKey);
|
|||
local mode = redis.call('hget', KEYS[1], 'mode');
|
||||
local maxRemainTime = tonumber(ARGV[1]);
|
||||
if (lockExists == 1) then
|
||||
redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
redis.call('hpexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey);
|
||||
if (mode == 'read') then
|
||||
local cursor = 0;
|
||||
local pattern = KEYS[2] .. ':*';
|
||||
|
|
@ -244,19 +244,19 @@ if (lockExists == 1) then
|
|||
|
||||
for i = 1, #fields,2 do
|
||||
local field = fields[i];
|
||||
local remainTime = redis.call('httl', KEYS[1], 'fields', '1', field);
|
||||
local remainTime = redis.call('hpttl', KEYS[1], 'fields', '1', field);
|
||||
maxRemainTime = math.max(tonumber(remainTime[1]), maxRemainTime);
|
||||
end;
|
||||
until cursor == 0;
|
||||
|
||||
if (maxRemainTime > 0) then
|
||||
local remainTime = redis.call('ttl', KEYS[1]);
|
||||
redis.call('expire', KEYS[1], math.max(tonumber(remainTime),maxRemainTime));
|
||||
local remainTime = redis.call('pttl', KEYS[1]);
|
||||
redis.call('pexpire', KEYS[1], math.max(tonumber(remainTime),maxRemainTime));
|
||||
end;
|
||||
elseif (mode == 'write') then
|
||||
redis.call('expire', KEYS[1], ARGV[1]);
|
||||
redis.call('pexpire', KEYS[1], ARGV[1]);
|
||||
end;
|
||||
-- return redis.call('ttl',KEYS[1]);
|
||||
-- return redis.call('pttl',KEYS[1]);
|
||||
return 1;
|
||||
end;
|
||||
return -8;
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
internalLockLeaseTime = uint64(30)
|
||||
internalLockLeaseTime = uint64(30 * 1000)
|
||||
unlockMessage = 0
|
||||
)
|
||||
|
||||
|
|
@ -35,15 +35,15 @@ type RedissionLockConfig struct {
|
|||
|
||||
type redissionLocker struct {
|
||||
lockLeaseTime uint64
|
||||
token string
|
||||
key string
|
||||
Token string
|
||||
Key string
|
||||
waitChanKey string
|
||||
needRefresh bool
|
||||
refreshExitChan chan struct{}
|
||||
subExitChan chan struct{}
|
||||
client *redis.Client
|
||||
refreshOnce *sync.Once
|
||||
logger *zap.Logger
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) error {
|
||||
|
|
@ -52,7 +52,7 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e
|
|||
}
|
||||
result := rl.tryLock(ctx).(*constant.RedisResult)
|
||||
if result.Code == constant.UnknownInternalError {
|
||||
rl.logger.Error(result.OutputResultMessage())
|
||||
rl.Logger.Error(result.OutputResultMessage())
|
||||
return fmt.Errorf("get lock failed:%w", result)
|
||||
}
|
||||
|
||||
|
|
@ -77,23 +77,23 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e
|
|||
case _, ok := <-subMsg:
|
||||
if !ok {
|
||||
err := errors.New("failed to read the lock waiting for for the channel message")
|
||||
rl.logger.Error("failed to read the lock waiting for for the channel message")
|
||||
rl.Logger.Error("failed to read the lock waiting for for the channel message")
|
||||
return err
|
||||
}
|
||||
|
||||
resultErr := rl.tryLock(ctx).(*constant.RedisResult)
|
||||
if (resultErr.Code == constant.LockFailure) || (resultErr.Code == constant.UnknownInternalError) {
|
||||
rl.logger.Info(resultErr.OutputResultMessage())
|
||||
rl.Logger.Info(resultErr.OutputResultMessage())
|
||||
continue
|
||||
}
|
||||
|
||||
if resultErr.Code == constant.LockSuccess {
|
||||
rl.logger.Info(resultErr.OutputResultMessage())
|
||||
rl.Logger.Info(resultErr.OutputResultMessage())
|
||||
return nil
|
||||
}
|
||||
case <-acquireTimer.C:
|
||||
err := errors.New("the waiting time for obtaining the lock operation has timed out")
|
||||
rl.logger.Info("the waiting time for obtaining the lock operation has timed out")
|
||||
rl.Logger.Info("the waiting time for obtaining the lock operation has timed out")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -105,7 +105,7 @@ func (rl *redissionLocker) subscribeLock(sub *redis.PubSub, subMsgChan chan stru
|
|||
if sub == nil || subMsgChan == nil {
|
||||
return
|
||||
}
|
||||
rl.logger.Info("lock: enter sub routine", zap.String("token", rl.token))
|
||||
rl.Logger.Info("lock: enter sub routine", zap.String("token", rl.Token))
|
||||
|
||||
for {
|
||||
select {
|
||||
|
|
@ -126,9 +126,9 @@ ARGV[1]:锁的过期时间(lockLeaseTime),单位为秒。
|
|||
ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。
|
||||
*/
|
||||
func (rl *redissionLocker) refreshLockTimeout(ctx context.Context) {
|
||||
rl.logger.Info("lock refresh by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("lock refresh by key and token", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
|
||||
lockTime := time.Duration(rl.lockLeaseTime/3) * time.Second
|
||||
lockTime := time.Duration(rl.lockLeaseTime/3) * time.Millisecond
|
||||
timer := time.NewTimer(lockTime)
|
||||
defer timer.Stop()
|
||||
|
||||
|
|
@ -136,20 +136,20 @@ func (rl *redissionLocker) refreshLockTimeout(ctx context.Context) {
|
|||
select {
|
||||
case <-timer.C:
|
||||
// extend key lease time
|
||||
res := rl.client.Eval(ctx, luascript.RefreshLockScript, []string{rl.key}, rl.lockLeaseTime, rl.token)
|
||||
res := rl.client.Eval(ctx, luascript.RefreshLockScript, []string{rl.Key}, rl.lockLeaseTime, rl.Token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
rl.logger.Info("lock refresh failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
rl.Logger.Info("lock refresh failed", zap.String("token", rl.Token), zap.String("key", rl.Key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if constant.RedisCode(val) == constant.RefreshLockFailure {
|
||||
rl.logger.Error("lock refreash failed,can not find the lock by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Error("lock refreash failed,can not find the lock by key and token", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
break
|
||||
}
|
||||
|
||||
if constant.RedisCode(val) == constant.RefreshLockSuccess {
|
||||
rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("lock refresh success by key and token", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
}
|
||||
timer.Reset(lockTime)
|
||||
case <-rl.refreshExitChan:
|
||||
|
|
@ -169,7 +169,7 @@ func (rl *redissionLocker) closeSub(sub *redis.PubSub, noticeChan chan struct{})
|
|||
if sub != nil {
|
||||
err := sub.Close()
|
||||
if err != nil {
|
||||
rl.logger.Error("close sub failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
rl.Logger.Error("close sub failed", zap.String("token", rl.Token), zap.String("key", rl.Key), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -185,7 +185,7 @@ ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端
|
|||
*/
|
||||
func (rl *redissionLocker) tryLock(ctx context.Context) error {
|
||||
lockType := constant.LockType
|
||||
res := rl.client.Eval(ctx, luascript.LockScript, []string{rl.key}, rl.lockLeaseTime, rl.token)
|
||||
res := rl.client.Eval(ctx, luascript.LockScript, []string{rl.Key}, rl.lockLeaseTime, rl.Token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error())
|
||||
|
|
@ -200,10 +200,10 @@ ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放
|
|||
ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。
|
||||
*/
|
||||
func (rl *redissionLocker) UnLock(ctx context.Context) error {
|
||||
res := rl.client.Eval(ctx, luascript.UnLockScript, []string{rl.key, rl.waitChanKey}, unlockMessage, rl.token)
|
||||
res := rl.client.Eval(ctx, luascript.UnLockScript, []string{rl.Key, rl.waitChanKey}, unlockMessage, rl.Token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
rl.logger.Info("unlock lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
rl.Logger.Info("unlock lock failed", zap.String("token", rl.Token), zap.String("key", rl.Key), zap.Error(err))
|
||||
return fmt.Errorf("unlock lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnLockType, err.Error()))
|
||||
}
|
||||
|
||||
|
|
@ -212,12 +212,12 @@ func (rl *redissionLocker) UnLock(ctx context.Context) error {
|
|||
rl.cancelRefreshLockTime()
|
||||
}
|
||||
|
||||
rl.logger.Info("unlock lock success", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("unlock lock success", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
return nil
|
||||
}
|
||||
|
||||
if constant.RedisCode(val) == constant.UnLocakFailureWithLockOccupancy {
|
||||
rl.logger.Info("unlock lock failed", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("unlock lock failed", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
return fmt.Errorf("unlock lock failed:%w", constant.NewRedisResult(constant.UnLocakFailureWithLockOccupancy, constant.UnLockType, ""))
|
||||
}
|
||||
return nil
|
||||
|
|
@ -246,13 +246,13 @@ func GetLocker(client *redis.Client, ops *RedissionLockConfig) *redissionLocker
|
|||
}
|
||||
|
||||
r := &redissionLocker{
|
||||
token: ops.Token,
|
||||
key: strings.Join([]string{ops.Prefix, ops.Key}, ":"),
|
||||
Token: ops.Token,
|
||||
Key: strings.Join([]string{ops.Prefix, ops.Key}, ":"),
|
||||
waitChanKey: strings.Join([]string{ops.ChanPrefix, ops.Key, "wait"}, ":"),
|
||||
needRefresh: ops.NeedRefresh,
|
||||
client: client,
|
||||
refreshExitChan: make(chan struct{}),
|
||||
logger: logger.GetLoggerInstance(),
|
||||
Logger: logger.GetLoggerInstance(),
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,13 +21,13 @@ type RedissionRWLocker struct {
|
|||
redissionLocker
|
||||
writeWaitChanKey string
|
||||
readWaitChanKey string
|
||||
rwTokenTimeoutPrefix string
|
||||
RWTokenTimeoutPrefix string
|
||||
}
|
||||
|
||||
func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration) error {
|
||||
result := rl.tryRLock(ctx).(*constant.RedisResult)
|
||||
if result.Code == constant.UnknownInternalError {
|
||||
rl.logger.Error(result.OutputResultMessage())
|
||||
rl.Logger.Error(result.OutputResultMessage())
|
||||
return fmt.Errorf("get read lock failed:%w", result)
|
||||
}
|
||||
|
||||
|
|
@ -42,7 +42,7 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration
|
|||
go rl.refreshLockTimeout(ctx)
|
||||
})
|
||||
}
|
||||
rl.logger.Info("success get the read lock by key and token", zap.String("key", rl.key), zap.String("token", rl.token))
|
||||
rl.Logger.Info("success get the read lock by key and token", zap.String("key", rl.Key), zap.String("token", rl.Token))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -61,18 +61,18 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration
|
|||
case _, ok := <-subMsgChan:
|
||||
if !ok {
|
||||
err := errors.New("failed to read the read lock waiting for for the channel message")
|
||||
rl.logger.Error("failed to read the read lock waiting for for the channel message")
|
||||
rl.Logger.Error("failed to read the read lock waiting for for the channel message")
|
||||
return err
|
||||
}
|
||||
|
||||
result := rl.tryRLock(ctx).(*constant.RedisResult)
|
||||
if (result.Code == constant.RLockFailureWithWLockOccupancy) || (result.Code == constant.UnknownInternalError) {
|
||||
rl.logger.Info(result.OutputResultMessage())
|
||||
rl.Logger.Info(result.OutputResultMessage())
|
||||
continue
|
||||
}
|
||||
|
||||
if result.Code == constant.LockSuccess {
|
||||
rl.logger.Info(result.OutputResultMessage())
|
||||
rl.Logger.Info(result.OutputResultMessage())
|
||||
rl.closeSub(sub, rl.subExitChan)
|
||||
|
||||
if rl.needRefresh {
|
||||
|
|
@ -88,7 +88,7 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration
|
|||
return nil
|
||||
}
|
||||
case <-acquireTimer.C:
|
||||
rl.logger.Info("the waiting time for obtaining the read lock operation has timed out")
|
||||
rl.Logger.Info("the waiting time for obtaining the read lock operation has timed out")
|
||||
rl.closeSub(sub, rl.subExitChan)
|
||||
// after acquire lock timeout,notice the sub channel to close
|
||||
return constant.AcquireTimeoutErr
|
||||
|
|
@ -101,7 +101,7 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration
|
|||
func (rl *RedissionRWLocker) tryRLock(ctx context.Context) error {
|
||||
lockType := constant.LockType
|
||||
|
||||
res := rl.client.Eval(ctx, luascript.RLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix}, rl.lockLeaseTime, rl.token)
|
||||
res := rl.client.Eval(ctx, luascript.RLockScript, []string{rl.Key, rl.RWTokenTimeoutPrefix}, rl.lockLeaseTime, rl.Token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error())
|
||||
|
|
@ -110,9 +110,9 @@ func (rl *RedissionRWLocker) tryRLock(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (rl *RedissionRWLocker) refreshLockTimeout(ctx context.Context) {
|
||||
rl.logger.Info("lock refresh by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("lock refresh by key and token", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
|
||||
lockTime := time.Duration(rl.lockLeaseTime/3) * time.Second
|
||||
lockTime := time.Duration(rl.lockLeaseTime/3) * time.Millisecond
|
||||
timer := time.NewTimer(lockTime)
|
||||
defer timer.Stop()
|
||||
|
||||
|
|
@ -120,20 +120,20 @@ func (rl *RedissionRWLocker) refreshLockTimeout(ctx context.Context) {
|
|||
select {
|
||||
case <-timer.C:
|
||||
// extend key lease time
|
||||
res := rl.client.Eval(ctx, luascript.RefreshRWLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix}, rl.lockLeaseTime, rl.token)
|
||||
res := rl.client.Eval(ctx, luascript.RefreshRWLockScript, []string{rl.Key, rl.RWTokenTimeoutPrefix}, rl.lockLeaseTime, rl.Token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
rl.logger.Info("lock refresh failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
rl.Logger.Info("lock refresh failed", zap.String("token", rl.Token), zap.String("key", rl.Key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if constant.RedisCode(val) == constant.RefreshLockFailure {
|
||||
rl.logger.Error("lock refreash failed,can not find the read lock by key and token", zap.String("rwTokenPrefix", rl.rwTokenTimeoutPrefix), zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Error("lock refreash failed,can not find the read lock by key and token", zap.String("rwTokenPrefix", rl.RWTokenTimeoutPrefix), zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
return
|
||||
}
|
||||
|
||||
if constant.RedisCode(val) == constant.RefreshLockSuccess {
|
||||
rl.logger.Info("lock refresh success by key and token", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("lock refresh success by key and token", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
}
|
||||
timer.Reset(lockTime)
|
||||
case <-rl.refreshExitChan:
|
||||
|
|
@ -143,11 +143,11 @@ func (rl *RedissionRWLocker) refreshLockTimeout(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (rl *RedissionRWLocker) UnRLock(ctx context.Context) error {
|
||||
rl.logger.Info("unlock RLock by key and token", zap.String("key", rl.key), zap.String("token", rl.token))
|
||||
res := rl.client.Eval(ctx, luascript.UnRLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.writeWaitChanKey}, unlockMessage, rl.token)
|
||||
rl.Logger.Info("unlock RLock by key and token", zap.String("key", rl.Key), zap.String("token", rl.Token))
|
||||
res := rl.client.Eval(ctx, luascript.UnRLockScript, []string{rl.Key, rl.RWTokenTimeoutPrefix, rl.writeWaitChanKey}, unlockMessage, rl.Token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
rl.logger.Info("unlock read lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
rl.Logger.Info("unlock read lock failed", zap.String("token", rl.Token), zap.String("key", rl.Key), zap.Error(err))
|
||||
return fmt.Errorf("unlock read lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnRLockType, err.Error()))
|
||||
}
|
||||
|
||||
|
|
@ -156,12 +156,12 @@ func (rl *RedissionRWLocker) UnRLock(ctx context.Context) error {
|
|||
rl.cancelRefreshLockTime()
|
||||
}
|
||||
|
||||
rl.logger.Info("unlock read lock success", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("unlock read lock success", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
return nil
|
||||
}
|
||||
|
||||
if constant.RedisCode(val) == constant.UnRLockFailureWithWLockOccupancy {
|
||||
rl.logger.Info("unlock read lock failed", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("unlock read lock failed", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
return fmt.Errorf("unlock read lock failed:%w", constant.NewRedisResult(constant.UnRLockFailureWithWLockOccupancy, constant.UnRLockType, ""))
|
||||
}
|
||||
return nil
|
||||
|
|
@ -170,7 +170,7 @@ func (rl *RedissionRWLocker) UnRLock(ctx context.Context) error {
|
|||
func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration) error {
|
||||
result := rl.tryWLock(ctx).(*constant.RedisResult)
|
||||
if result.Code == constant.UnknownInternalError {
|
||||
rl.logger.Error(result.OutputResultMessage())
|
||||
rl.Logger.Error(result.OutputResultMessage())
|
||||
return fmt.Errorf("get write lock failed:%w", result)
|
||||
}
|
||||
|
||||
|
|
@ -185,7 +185,7 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration
|
|||
go rl.refreshLockTimeout(ctx)
|
||||
})
|
||||
}
|
||||
rl.logger.Info("success get the write lock by key and token", zap.String("key", rl.key), zap.String("token", rl.token))
|
||||
rl.Logger.Info("success get the write lock by key and token", zap.String("key", rl.Key), zap.String("token", rl.Token))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -204,18 +204,18 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration
|
|||
case _, ok := <-subMsgChan:
|
||||
if !ok {
|
||||
err := errors.New("failed to read the write lock waiting for for the channel message")
|
||||
rl.logger.Error("failed to read the read lock waiting for for the channel message")
|
||||
rl.Logger.Error("failed to read the read lock waiting for for the channel message")
|
||||
return err
|
||||
}
|
||||
|
||||
result := rl.tryWLock(ctx).(*constant.RedisResult)
|
||||
if (result.Code == constant.UnknownInternalError) || (result.Code == constant.WLockFailureWithRLockOccupancy) || (result.Code == constant.WLockFailureWithWLockOccupancy) || (result.Code == constant.WLockFailureWithNotFirstPriority) {
|
||||
rl.logger.Info(result.OutputResultMessage())
|
||||
rl.Logger.Info(result.OutputResultMessage())
|
||||
continue
|
||||
}
|
||||
|
||||
if result.Code == constant.LockSuccess {
|
||||
rl.logger.Info(result.OutputResultMessage())
|
||||
rl.Logger.Info(result.OutputResultMessage())
|
||||
rl.closeSub(sub, rl.subExitChan)
|
||||
|
||||
if rl.needRefresh {
|
||||
|
|
@ -231,7 +231,7 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration
|
|||
return nil
|
||||
}
|
||||
case <-acquireTimer.C:
|
||||
rl.logger.Info("the waiting time for obtaining the write lock operation has timed out")
|
||||
rl.Logger.Info("the waiting time for obtaining the write lock operation has timed out")
|
||||
rl.closeSub(sub, rl.subExitChan)
|
||||
// after acquire lock timeout,notice the sub channel to close
|
||||
return constant.AcquireTimeoutErr
|
||||
|
|
@ -244,7 +244,7 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration
|
|||
func (rl *RedissionRWLocker) tryWLock(ctx context.Context) error {
|
||||
lockType := constant.LockType
|
||||
|
||||
res := rl.client.Eval(ctx, luascript.WLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix}, rl.lockLeaseTime, rl.token)
|
||||
res := rl.client.Eval(ctx, luascript.WLockScript, []string{rl.Key, rl.RWTokenTimeoutPrefix}, rl.lockLeaseTime, rl.Token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
return constant.NewRedisResult(constant.UnknownInternalError, lockType, err.Error())
|
||||
|
|
@ -253,10 +253,10 @@ func (rl *RedissionRWLocker) tryWLock(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (rl *RedissionRWLocker) UnWLock(ctx context.Context) error {
|
||||
res := rl.client.Eval(ctx, luascript.UnWLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.writeWaitChanKey, rl.readWaitChanKey}, unlockMessage, rl.token)
|
||||
res := rl.client.Eval(ctx, luascript.UnWLockScript, []string{rl.Key, rl.RWTokenTimeoutPrefix, rl.writeWaitChanKey, rl.readWaitChanKey}, unlockMessage, rl.Token)
|
||||
val, err := res.Int()
|
||||
if err != redis.Nil && err != nil {
|
||||
rl.logger.Error("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||
rl.Logger.Error("unlock write lock failed", zap.String("token", rl.Token), zap.String("key", rl.Key), zap.Error(err))
|
||||
return fmt.Errorf("unlock write lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnWLockType, err.Error()))
|
||||
}
|
||||
|
||||
|
|
@ -264,12 +264,12 @@ func (rl *RedissionRWLocker) UnWLock(ctx context.Context) error {
|
|||
if rl.needRefresh && (constant.RedisCode(val) == constant.UnLockSuccess) {
|
||||
rl.cancelRefreshLockTime()
|
||||
}
|
||||
rl.logger.Info("unlock write lock success", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("unlock write lock success", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
return nil
|
||||
}
|
||||
|
||||
if (constant.RedisCode(val) == constant.UnWLockFailureWithRLockOccupancy) || (constant.RedisCode(val) == constant.UnWLockFailureWithWLockOccupancy) {
|
||||
rl.logger.Info("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key))
|
||||
rl.Logger.Info("unlock write lock failed", zap.String("token", rl.Token), zap.String("key", rl.Key))
|
||||
return fmt.Errorf("unlock write lock failed:%w", constant.NewRedisResult(constant.RedisCode(val), constant.UnWLockType, ""))
|
||||
}
|
||||
return nil
|
||||
|
|
@ -302,20 +302,20 @@ func GetRWLocker(client *redis.Client, conf *RedissionLockConfig) *RedissionRWLo
|
|||
}
|
||||
|
||||
r := &redissionLocker{
|
||||
token: conf.Token,
|
||||
key: strings.Join([]string{conf.Prefix, conf.Key}, ":"),
|
||||
Token: conf.Token,
|
||||
Key: strings.Join([]string{conf.Prefix, conf.Key}, ":"),
|
||||
needRefresh: conf.NeedRefresh,
|
||||
lockLeaseTime: conf.LockLeaseTime,
|
||||
client: client,
|
||||
refreshOnce: &sync.Once{},
|
||||
logger: logger.GetLoggerInstance(),
|
||||
Logger: logger.GetLoggerInstance(),
|
||||
}
|
||||
|
||||
rwLocker := &RedissionRWLocker{
|
||||
redissionLocker: *r,
|
||||
writeWaitChanKey: strings.Join([]string{conf.ChanPrefix, conf.Key, "write"}, ":"),
|
||||
readWaitChanKey: strings.Join([]string{conf.ChanPrefix, conf.Key, "read"}, ":"),
|
||||
rwTokenTimeoutPrefix: conf.TimeoutPrefix,
|
||||
RWTokenTimeoutPrefix: conf.TimeoutPrefix,
|
||||
}
|
||||
return rwLocker
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package distributedlock
|
||||
package distributedlock_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
dl "modelRT/distributedlock"
|
||||
"modelRT/distributedlock/constant"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
|
@ -39,28 +40,28 @@ func init() {
|
|||
func TestRWLockRLockAndUnRLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker.logger = log
|
||||
rwLocker.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// 第一次加读锁
|
||||
err := rwLocker.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey := strings.Join([]string{rwLocker.rwTokenTimeoutPrefix, rwLocker.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
tokenKey := strings.Join([]string{rwLocker.RWTokenTimeoutPrefix, rwLocker.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
err = rwLocker.UnRLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, redis.Nil, err)
|
||||
assert.Equal(t, 0, num)
|
||||
t.Log("rwLock rlock and unrlock test success")
|
||||
|
|
@ -70,21 +71,21 @@ func TestRWLockRLockAndUnRLock(t *testing.T) {
|
|||
func TestRWLockReentrantRLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker.logger = log
|
||||
rwLocker.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// 第一次加读锁
|
||||
err := rwLocker.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey := strings.Join([]string{rwLocker.rwTokenTimeoutPrefix, rwLocker.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
tokenKey := strings.Join([]string{rwLocker.RWTokenTimeoutPrefix, rwLocker.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -92,7 +93,7 @@ func TestRWLockReentrantRLock(t *testing.T) {
|
|||
err = rwLocker.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 2, num)
|
||||
|
||||
|
|
@ -100,7 +101,7 @@ func TestRWLockReentrantRLock(t *testing.T) {
|
|||
err = rwLocker.UnRLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -108,7 +109,7 @@ func TestRWLockReentrantRLock(t *testing.T) {
|
|||
err = rwLocker.UnRLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, redis.Nil, err)
|
||||
assert.Equal(t, 0, num)
|
||||
t.Log("rwLock reentrant lock test success")
|
||||
|
|
@ -118,29 +119,30 @@ func TestRWLockReentrantRLock(t *testing.T) {
|
|||
func TestRWLockRefreshRLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 10,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker.logger = log
|
||||
rwLocker.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// 第一次加读锁
|
||||
err := rwLocker.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey := strings.Join([]string{rwLocker.rwTokenTimeoutPrefix, rwLocker.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
tokenKey := strings.Join([]string{rwLocker.RWTokenTimeoutPrefix, rwLocker.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
script := `return redis.call('httl', KEYS[1], 'fields', '1', ARGV[1]);`
|
||||
result, err := rdb.Eval(ctx, script, []string{rwLocker.key}, tokenKey).Result()
|
||||
result, err := rdb.Eval(ctx, script, []string{rwLocker.Key}, tokenKey).Result()
|
||||
assert.Equal(t, nil, err)
|
||||
ttls, ok := result.([]interface{})
|
||||
// ttls, ok := result.([]interface{})
|
||||
ttls, ok := result.([]any)
|
||||
assert.Equal(t, true, ok)
|
||||
ttl, ok := ttls[0].(int64)
|
||||
assert.Equal(t, true, ok)
|
||||
|
|
@ -150,7 +152,7 @@ func TestRWLockRefreshRLock(t *testing.T) {
|
|||
err = rwLocker.UnRLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, redis.Nil, err)
|
||||
assert.Equal(t, 0, num)
|
||||
t.Log("rwLock refresh lock test success")
|
||||
|
|
@ -160,29 +162,29 @@ func TestRWLockRefreshRLock(t *testing.T) {
|
|||
func TestRWLock2ClientRLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker1 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker1.logger = log
|
||||
rwLocker1.Logger = log
|
||||
|
||||
rwLocker2 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker2 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc5577",
|
||||
})
|
||||
rwLocker2.logger = log
|
||||
rwLocker2.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// locker1加读锁
|
||||
err := rwLocker1.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.rwTokenTimeoutPrefix, rwLocker1.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.key, tokenKey1).Int()
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.RWTokenTimeoutPrefix, rwLocker1.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.Key, tokenKey1).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -190,14 +192,14 @@ func TestRWLock2ClientRLock(t *testing.T) {
|
|||
err = rwLocker2.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.rwTokenTimeoutPrefix, rwLocker2.token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.key, tokenKey2).Int()
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.RWTokenTimeoutPrefix, rwLocker2.Token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.Key, tokenKey2).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
err = rdb.HLen(ctx, rwLocker1.key).Err()
|
||||
err = rdb.HLen(ctx, rwLocker1.Key).Err()
|
||||
assert.Equal(t, nil, err)
|
||||
hLen := rdb.HLen(ctx, rwLocker1.key).Val()
|
||||
hLen := rdb.HLen(ctx, rwLocker1.Key).Val()
|
||||
assert.Equal(t, int64(3), hLen)
|
||||
|
||||
// locker1解读锁
|
||||
|
|
@ -208,9 +210,9 @@ func TestRWLock2ClientRLock(t *testing.T) {
|
|||
err = rwLocker2.UnRLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
err = rdb.Exists(ctx, rwLocker1.key).Err()
|
||||
err = rdb.Exists(ctx, rwLocker1.Key).Err()
|
||||
assert.Equal(t, nil, err)
|
||||
existNum := rdb.Exists(ctx, rwLocker1.key).Val()
|
||||
existNum := rdb.Exists(ctx, rwLocker1.Key).Val()
|
||||
assert.Equal(t, int64(0), existNum)
|
||||
t.Log("rwLock 2 client lock test success")
|
||||
return
|
||||
|
|
@ -219,29 +221,29 @@ func TestRWLock2ClientRLock(t *testing.T) {
|
|||
func TestRWLock2CWith2DifTimeRLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker1 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker1.logger = log
|
||||
rwLocker1.Logger = log
|
||||
|
||||
rwLocker2 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker2 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 30,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc5577",
|
||||
})
|
||||
rwLocker2.logger = log
|
||||
rwLocker2.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// locker1加读锁
|
||||
err := rwLocker1.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.rwTokenTimeoutPrefix, rwLocker1.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.key, tokenKey1).Int()
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.RWTokenTimeoutPrefix, rwLocker1.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.Key, tokenKey1).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -249,20 +251,21 @@ func TestRWLock2CWith2DifTimeRLock(t *testing.T) {
|
|||
err = rwLocker2.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.rwTokenTimeoutPrefix, rwLocker2.token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.key, tokenKey2).Int()
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.RWTokenTimeoutPrefix, rwLocker2.Token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.Key, tokenKey2).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
err = rdb.HLen(ctx, rwLocker1.key).Err()
|
||||
err = rdb.HLen(ctx, rwLocker1.Key).Err()
|
||||
assert.Equal(t, nil, err)
|
||||
hLen := rdb.HLen(ctx, rwLocker1.key).Val()
|
||||
hLen := rdb.HLen(ctx, rwLocker1.Key).Val()
|
||||
assert.Equal(t, int64(3), hLen)
|
||||
|
||||
script := `return redis.call('httl', KEYS[1], 'fields', '1', ARGV[1]);`
|
||||
result, err := rdb.Eval(ctx, script, []string{rwLocker1.key}, tokenKey1).Result()
|
||||
result, err := rdb.Eval(ctx, script, []string{rwLocker1.Key}, tokenKey1).Result()
|
||||
assert.Equal(t, nil, err)
|
||||
ttls, ok := result.([]interface{})
|
||||
// ttls, ok := result.([]interface{})
|
||||
ttls, ok := result.([]any)
|
||||
assert.Equal(t, true, ok)
|
||||
ttl, ok := ttls[0].(int64)
|
||||
assert.Equal(t, true, ok)
|
||||
|
|
@ -273,16 +276,16 @@ func TestRWLock2CWith2DifTimeRLock(t *testing.T) {
|
|||
err = rwLocker1.UnRLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
hashTTL := rdb.TTL(ctx, rwLocker1.key).Val().Seconds()
|
||||
hashTTL := rdb.TTL(ctx, rwLocker1.Key).Val().Seconds()
|
||||
assert.Greater(t, hashTTL, float64(20))
|
||||
|
||||
// locker2解读锁
|
||||
err = rwLocker2.UnRLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
err = rdb.Exists(ctx, rwLocker1.key).Err()
|
||||
err = rdb.Exists(ctx, rwLocker1.Key).Err()
|
||||
assert.Equal(t, nil, err)
|
||||
existNum := rdb.Exists(ctx, rwLocker1.key).Val()
|
||||
existNum := rdb.Exists(ctx, rwLocker1.Key).Val()
|
||||
assert.Equal(t, int64(0), existNum)
|
||||
t.Log("rwLock 2 client lock test success")
|
||||
return
|
||||
|
|
@ -291,29 +294,29 @@ func TestRWLock2CWith2DifTimeRLock(t *testing.T) {
|
|||
func TestRWLock2CWithTimeTransformRLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker1 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 30,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker1.logger = log
|
||||
rwLocker1.Logger = log
|
||||
|
||||
rwLocker2 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker2 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc5577",
|
||||
})
|
||||
rwLocker2.logger = log
|
||||
rwLocker2.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// locker1加读锁
|
||||
err := rwLocker1.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.rwTokenTimeoutPrefix, rwLocker1.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.key, tokenKey1).Int()
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.RWTokenTimeoutPrefix, rwLocker1.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.Key, tokenKey1).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -321,17 +324,17 @@ func TestRWLock2CWithTimeTransformRLock(t *testing.T) {
|
|||
err = rwLocker2.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.rwTokenTimeoutPrefix, rwLocker2.token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.key, tokenKey2).Int()
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.RWTokenTimeoutPrefix, rwLocker2.Token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.Key, tokenKey2).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
err = rdb.HLen(ctx, rwLocker1.key).Err()
|
||||
err = rdb.HLen(ctx, rwLocker1.Key).Err()
|
||||
assert.Equal(t, nil, err)
|
||||
hLen := rdb.HLen(ctx, rwLocker1.key).Val()
|
||||
hLen := rdb.HLen(ctx, rwLocker1.Key).Val()
|
||||
assert.Equal(t, int64(3), hLen)
|
||||
|
||||
hashTTL := rdb.TTL(ctx, rwLocker2.key).Val().Seconds()
|
||||
hashTTL := rdb.TTL(ctx, rwLocker2.Key).Val().Seconds()
|
||||
assert.Greater(t, hashTTL, float64(100))
|
||||
|
||||
// locker2解读锁
|
||||
|
|
@ -339,16 +342,16 @@ func TestRWLock2CWithTimeTransformRLock(t *testing.T) {
|
|||
assert.Equal(t, nil, err)
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
hashTTL = rdb.TTL(ctx, rwLocker1.key).Val().Seconds()
|
||||
hashTTL = rdb.TTL(ctx, rwLocker1.Key).Val().Seconds()
|
||||
assert.Greater(t, hashTTL, float64(15))
|
||||
|
||||
// locker1解读锁
|
||||
err = rwLocker1.UnRLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
err = rdb.Exists(ctx, rwLocker1.key).Err()
|
||||
err = rdb.Exists(ctx, rwLocker1.Key).Err()
|
||||
assert.Equal(t, nil, err)
|
||||
existNum := rdb.Exists(ctx, rwLocker1.key).Val()
|
||||
existNum := rdb.Exists(ctx, rwLocker1.Key).Val()
|
||||
assert.Equal(t, int64(0), existNum)
|
||||
t.Log("rwLock 2 client lock test success")
|
||||
return
|
||||
|
|
@ -357,28 +360,28 @@ func TestRWLock2CWithTimeTransformRLock(t *testing.T) {
|
|||
func TestRWLockWLockAndUnWLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker.logger = log
|
||||
rwLocker.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// 第一次加读锁
|
||||
err := rwLocker.WLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey := strings.Join([]string{rwLocker.rwTokenTimeoutPrefix, rwLocker.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
tokenKey := strings.Join([]string{rwLocker.RWTokenTimeoutPrefix, rwLocker.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
err = rwLocker.UnWLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, redis.Nil, err)
|
||||
assert.Equal(t, 0, num)
|
||||
t.Log("rwLock rlock and unrlock test success")
|
||||
|
|
@ -388,21 +391,21 @@ func TestRWLockWLockAndUnWLock(t *testing.T) {
|
|||
func TestRWLockReentrantWLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker.logger = log
|
||||
rwLocker.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// 第一次加写锁
|
||||
err := rwLocker.WLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey := strings.Join([]string{rwLocker.rwTokenTimeoutPrefix, rwLocker.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
tokenKey := strings.Join([]string{rwLocker.RWTokenTimeoutPrefix, rwLocker.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -410,7 +413,7 @@ func TestRWLockReentrantWLock(t *testing.T) {
|
|||
err = rwLocker.WLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 2, num)
|
||||
|
||||
|
|
@ -418,7 +421,7 @@ func TestRWLockReentrantWLock(t *testing.T) {
|
|||
err = rwLocker.UnWLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -426,7 +429,7 @@ func TestRWLockReentrantWLock(t *testing.T) {
|
|||
err = rwLocker.UnWLock(ctx)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
num, err = rdb.HGet(ctx, rwLocker.key, tokenKey).Int()
|
||||
num, err = rdb.HGet(ctx, rwLocker.Key, tokenKey).Int()
|
||||
assert.Equal(t, redis.Nil, err)
|
||||
assert.Equal(t, 0, num)
|
||||
t.Log("rwLock reentrant lock test success")
|
||||
|
|
@ -436,29 +439,29 @@ func TestRWLockReentrantWLock(t *testing.T) {
|
|||
func TestRWLock2CWithRLockAndWLockFailed(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker1 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker1.logger = log
|
||||
rwLocker1.Logger = log
|
||||
|
||||
rwLocker2 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker2 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 30,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc5577",
|
||||
})
|
||||
rwLocker2.logger = log
|
||||
rwLocker2.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// locker1加读锁
|
||||
err := rwLocker1.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.rwTokenTimeoutPrefix, rwLocker1.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.key, tokenKey1).Int()
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.RWTokenTimeoutPrefix, rwLocker1.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.Key, tokenKey1).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -476,28 +479,28 @@ func TestRWLock2CWithRLockAndWLockFailed(t *testing.T) {
|
|||
|
||||
func TestRWLock2CWithRLockAndWLockSucceed(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker1 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker1.logger = log
|
||||
rwLocker1.Logger = log
|
||||
|
||||
rwLocker2 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker2 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc5577",
|
||||
})
|
||||
rwLocker2.logger = log
|
||||
rwLocker2.Logger = log
|
||||
duration := 10 * time.Second
|
||||
// locker1加读锁
|
||||
err := rwLocker1.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.rwTokenTimeoutPrefix, rwLocker1.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.key, tokenKey1).Int()
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.RWTokenTimeoutPrefix, rwLocker1.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.Key, tokenKey1).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -513,8 +516,8 @@ func TestRWLock2CWithRLockAndWLockSucceed(t *testing.T) {
|
|||
err = rwLocker2.WLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.rwTokenTimeoutPrefix, rwLocker2.token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.key, tokenKey2).Int()
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.RWTokenTimeoutPrefix, rwLocker2.Token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.Key, tokenKey2).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -529,29 +532,29 @@ func TestRWLock2CWithRLockAndWLockSucceed(t *testing.T) {
|
|||
func TestRWLock2CWithWLockAndRLock(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker1 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 120,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a",
|
||||
})
|
||||
rwLocker1.logger = log
|
||||
rwLocker1.Logger = log
|
||||
|
||||
rwLocker2 := GetRWLocker(rdb, &RedissionLockConfig{
|
||||
rwLocker2 := dl.GetRWLocker(rdb, &dl.RedissionLockConfig{
|
||||
LockLeaseTime: 30,
|
||||
NeedRefresh: true,
|
||||
Key: "component",
|
||||
Token: "fd348a84-e07c-4a61-8c19-f753e6bc5577",
|
||||
})
|
||||
rwLocker2.logger = log
|
||||
rwLocker2.Logger = log
|
||||
|
||||
duration := 10 * time.Second
|
||||
// locker1加写锁
|
||||
err := rwLocker1.WLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.rwTokenTimeoutPrefix, rwLocker1.token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.key, tokenKey1).Int()
|
||||
tokenKey1 := strings.Join([]string{rwLocker1.RWTokenTimeoutPrefix, rwLocker1.Token}, ":")
|
||||
num, err := rdb.HGet(ctx, rwLocker1.Key, tokenKey1).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
|
|
@ -567,8 +570,8 @@ func TestRWLock2CWithWLockAndRLock(t *testing.T) {
|
|||
err = rwLocker2.RLock(ctx, duration)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.rwTokenTimeoutPrefix, rwLocker2.token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.key, tokenKey2).Int()
|
||||
tokenKey2 := strings.Join([]string{rwLocker2.RWTokenTimeoutPrefix, rwLocker2.Token}, ":")
|
||||
num, err = rdb.HGet(ctx, rwLocker2.Key, tokenKey2).Int()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 1, num)
|
||||
|
||||
Loading…
Reference in New Issue