diff --git a/distributedlock/luascript/rwlock_script.go b/distributedlock/luascript/rwlock_script.go index 3af9e27..db58a52 100644 --- a/distributedlock/luascript/rwlock_script.go +++ b/distributedlock/luascript/rwlock_script.go @@ -79,10 +79,10 @@ elseif (mode == 'write') then return -2; end; --- 判断当前的确是读模式但是当前 token 并没有加读锁的情况,返回 1 +-- 判断当前的确是读模式但是当前 token 并没有加读锁的情况,返回 0 local lockExists = redis.call('hexists', KEYS[1], lockKey); if ((mode == 'read') and (lockExists == 0)) then - return 1; + return 0; end; local counter = redis.call('hincrby', KEYS[1], lockKey, -1); diff --git a/distributedlock/redis_lock.go b/distributedlock/redis_lock.go index f96522e..8b312ef 100644 --- a/distributedlock/redis_lock.go +++ b/distributedlock/redis_lock.go @@ -167,7 +167,10 @@ func (rl *redissionLocker) cancelRefreshLockTime() { func (rl *redissionLocker) closeSub(sub *redis.PubSub, noticeChan chan struct{}) { if sub != nil { - sub.Close() + err := sub.Close() + if err != nil { + rl.logger.Error("close sub failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) + } } if noticeChan != nil { diff --git a/distributedlock/redis_rwlock.go b/distributedlock/redis_rwlock.go index f34c839..a94d566 100644 --- a/distributedlock/redis_rwlock.go +++ b/distributedlock/redis_rwlock.go @@ -74,6 +74,17 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration if result.Code == constant.LockSuccess { rl.logger.Info(result.OutputResultMessage()) rl.closeSub(sub, rl.subExitChan) + + if rl.needRefresh { + rl.refreshOnce.Do(func() { + if rl.refreshExitChan == nil { + rl.refreshExitChan = make(chan struct{}) + } + + // async refresh lock timeout unitl receive exit singal + go rl.refreshLockTimeout(ctx) + }) + } return nil } case <-acquireTimer.C: @@ -206,6 +217,17 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration if result.Code == constant.LockSuccess { rl.logger.Info(result.OutputResultMessage()) rl.closeSub(sub, rl.subExitChan) + + if rl.needRefresh { + rl.refreshOnce.Do(func() { + if rl.refreshExitChan == nil { + rl.refreshExitChan = make(chan struct{}) + } + + // async refresh lock timeout unitl receive exit singal + go rl.refreshLockTimeout(ctx) + }) + } return nil } case <-acquireTimer.C: diff --git a/distributedlock/rwlock_test.go b/distributedlock/rwlock_test.go index 341abec..fc1c07b 100644 --- a/distributedlock/rwlock_test.go +++ b/distributedlock/rwlock_test.go @@ -526,7 +526,6 @@ func TestRWLock2CWithRLockAndWLockSucceed(t *testing.T) { return } -// TODO 设计两个客户端,C1先加写锁与C2后加读锁 func TestRWLock2CWithWLockAndRLock(t *testing.T) { ctx := context.TODO() @@ -547,7 +546,7 @@ func TestRWLock2CWithWLockAndRLock(t *testing.T) { rwLocker2.logger = log duration := 10 * time.Second - // locker1加读锁 + // locker1加写锁 err := rwLocker1.WLock(ctx, duration) assert.Equal(t, nil, err) @@ -556,14 +555,25 @@ func TestRWLock2CWithWLockAndRLock(t *testing.T) { assert.Equal(t, nil, err) assert.Equal(t, 1, num) + go func() { + // locker1解写锁 + time.Sleep(10 * time.Second) + err = rwLocker1.UnWLock(ctx) + assert.Equal(t, nil, err) + }() + // locker2加读锁 - duration = 2 * time.Second + duration = 30 * time.Second err = rwLocker2.RLock(ctx, duration) - // TODO 预测加读锁失败 assert.Equal(t, nil, err) - // locker1解写锁 - err = rwLocker1.UnWLock(ctx) + tokenKey2 := strings.Join([]string{rwLocker2.rwTokenTimeoutPrefix, rwLocker2.token}, ":") + num, err = rdb.HGet(ctx, rwLocker2.key, tokenKey2).Int() + assert.Equal(t, nil, err) + assert.Equal(t, 1, num) + + // locker2解读锁 + err = rwLocker2.UnRLock(ctx) assert.Equal(t, nil, err) t.Log("rwLock 2 client lock test success")