diff --git a/distributedlock/constant/redis_result.go b/distributedlock/constant/redis_result.go index d389a39..dbcef53 100644 --- a/distributedlock/constant/redis_result.go +++ b/distributedlock/constant/redis_result.go @@ -81,7 +81,7 @@ func NewRedisResult(res RedisCode, lockType RedisLockType, redisMsg string) erro case -9: return &RedisResult{Code: res, Message: "redis lock failure,the lock is already occupied by another processes lock"} case -99: - return &RedisResult{Code: res, Message: "redis internal execution error"} + return &RedisResult{Code: res, Message: fmt.Sprintf("redis internal execution error:%v\n", redisMsg)} default: msg := "unkown redis execution result" if redisMsg != "" { diff --git a/distributedlock/luascript/rwlock_script.go b/distributedlock/luascript/rwlock_script.go index 814c559..2374e4b 100644 --- a/distributedlock/luascript/rwlock_script.go +++ b/distributedlock/luascript/rwlock_script.go @@ -14,7 +14,7 @@ local lockKey = KEYS[2] .. ':' .. ARGV[2]; if (mode == false) then redis.call('hset', KEYS[1], 'mode', 'read'); redis.call('hset', KEYS[1], lockKey, '1'); - redis.call('hexpire', KEYS[1], ARGV[1] 'fields' '1' lockKey); + redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey); redis.call('expire', KEYS[1], ARGV[1]); return 1; end; @@ -30,10 +30,10 @@ if (mode == 'read') then if (redis.call('exists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], lockKey, '1'); local remainTime = redis.call('httl', KEYS[1], 'fields', '1', lockKey); - redis.call('hexpire', key, math.max(remainTime, ARGV[1])); + redis.call('hexpire', KEYS[1], math.max(remainTime, ARGV[1]), 'fields', '1', lockKey); else redis.call('hset', KEYS[1], lockKey, '1'); - redis.call('hexpire', KEYS[1], lockKey, ARGV[1]); + redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey); end; local cursor = 0; local maxRemainTime = tonumber(ARGV[1]); @@ -142,7 +142,7 @@ if (mode == false) then end; redis.call('hset', KEYS[1], 'mode', 'write'); redis.call('hset', KEYS[1], lockKey, 1); - redis.call('hexpire', KEYS[1], ARGV[1] 'fields' '1' lockKey); + redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey); redis.call('expire', KEYS[1], ARGV[1]); redis.call('lpop', waitKey, '1') return 1; @@ -156,7 +156,7 @@ else local lockExists = redis.call('hexists', KEYS[1], lockKey) if (lockExists == 1) then redis.call('hincrby', KEYS[1], lockKey, 1); - redis.call('hexpire', KEYS[1], ARGV[1] 'fields' '1' lockKey); + redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey); redis.call('expire', KEYS[1], ARGV[1]); return 1; end; @@ -219,11 +219,11 @@ var RefreshRWLockScript = ` local lockKey = KEYS[2] .. ':' .. ARGV[2] local lockExists = redis.call('hexists', KEYS[1], lockKey); local mode = redis.call('hget', KEYS[1], 'mode') +local maxRemainTime = tonumber(ARGV[1]); if (lockExists == 1) then - redis.call('hexpire', KEYS[1], ARGV[1] 'fields' '1' lockKey); + redis.call('hexpire', KEYS[1], ARGV[1], 'fields', '1', lockKey); if (mode == 'read' ) then local cursor = 0; - local maxRemainTime = tonumber(ARGV[1]); local pattern = KEYS[2] .. ':*'; repeat local hscanResult = redis.call('hscan', KEYS[1], cursor, 'match', pattern, 'count', '100'); @@ -236,11 +236,15 @@ if (lockExists == 1) then maxRemainTime = math.max(tonumber(remainTime[1]), maxRemainTime); end; until cursor == 0; + if (maxRemainTime > 0) then local remainTime = redis.call('ttl', KEYS[1]); redis.call('expire', KEYS[1], math.max(tonumber(remainTime),maxRemainTime)); end; + elseif (mode == 'write') then + redis.call('expire', KEYS[1], ARGV[1]); end; + -- return redis.call('ttl',KEYS[1]); return 1; end; return -8; diff --git a/distributedlock/redis_lock.go b/distributedlock/redis_lock.go index 9653589..8e55b29 100644 --- a/distributedlock/redis_lock.go +++ b/distributedlock/redis_lock.go @@ -21,8 +21,9 @@ const ( unlockMessage = 0 ) +// RedissionLockConfig define redission lock config type RedissionLockConfig struct { - LockLeaseTime time.Duration + LockLeaseTime uint64 Prefix string ChanPrefix string TimeoutPrefix string diff --git a/distributedlock/redis_rwlock.go b/distributedlock/redis_rwlock.go index 529ff75..caee734 100644 --- a/distributedlock/redis_rwlock.go +++ b/distributedlock/redis_rwlock.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "modelRT/distributedlock/constant" @@ -107,8 +108,8 @@ func (rl *RedissionRWLocker) refreshLockTimeout() { } if constant.RedisCode(val) == constant.RefreshLockFailure { - rl.logger.Error("lock refreash failed,can not find the read lock by key and token", zap.String("token", rl.token), zap.String("key", rl.key)) - break + rl.logger.Error("lock refreash failed,can not find the read lock by key and token", zap.String("rwTokenPrefix", rl.rwTokenTimeoutPrefix), zap.String("token", rl.token), zap.String("key", rl.key)) + return } if constant.RedisCode(val) == constant.RefreshLockSuccess { @@ -241,6 +242,7 @@ func GetRWLocker(client *redis.Client, ops *RedissionLockConfig) *RedissionRWLoc needRefresh: true, client: client, exit: make(chan struct{}), + once: &sync.Once{}, logger: logger.GetLoggerInstance(), } @@ -257,10 +259,11 @@ func GetRWLocker(client *redis.Client, ops *RedissionLockConfig) *RedissionRWLoc } if ops.LockLeaseTime == 0 { - r.lockLeaseTime = internalLockLeaseTime + ops.LockLeaseTime = internalLockLeaseTime } r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":") + r.lockLeaseTime = ops.LockLeaseTime r.waitChanKey = strings.Join([]string{ops.ChanPrefix, ops.Key, "write"}, ":") rwLocker := &RedissionRWLocker{ diff --git a/distributedlock/rwlock_test.go b/distributedlock/rwlock_test.go new file mode 100644 index 0000000..8ecaaaa --- /dev/null +++ b/distributedlock/rwlock_test.go @@ -0,0 +1,46 @@ +package distributed_lock + +import ( + "testing" + "time" + + "github.com/go-redis/redis" + "go.uber.org/zap" +) + +var log *zap.Logger + +func init() { + log = zap.Must(zap.NewDevelopment()) +} + +func TestRWLockReentrantLock(t *testing.T) { + rdb := redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: "192.168.2.103:6379", + Password: "cnstar", + PoolSize: 50, + DialTimeout: 10 * time.Second, + }) + + rwLocker := GetRWLocker(rdb, &RedissionLockConfig{ + LockLeaseTime: 120, + Key: "component", + }) + + rwLocker.logger = log + t.Logf("%+v\n", rwLocker) + + duration := 10 * time.Second + // 第一次加读锁 + err := rwLocker.RLock(duration) + t.Logf("err:%+v\n", err) + // TODO 实现可重入读锁测试 + // rwLocker.UnRLock() + // // 第二次加读锁 + // rwLocker.RLock(duration) + // // 查看 redis 中相关 key 的值 + // rwLocker.UnRLock() + t.Log("test success") + select {} +}