diff --git a/distributedlock/constant/redis_result.go b/distributedlock/constant/redis_result.go index 7c689fe..18eb21d 100644 --- a/distributedlock/constant/redis_result.go +++ b/distributedlock/constant/redis_result.go @@ -78,9 +78,9 @@ func NewRedisResult(res RedisCode, lockType RedisLockType, redisMsg string) erro case -4: return &RedisResult{Code: res, Message: "redis lock write lock failure,the lock is already occupied by anthor processes write lock"} case -5: - return &RedisResult{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes read lock"} + return &RedisResult{Code: res, Message: "redis unlock write lock failure,the lock is already occupied by another processes read lock"} case -6: - return &RedisResult{Code: res, Message: "redis un lock write lock failure,the lock is already occupied by another processes write lock"} + return &RedisResult{Code: res, Message: "redis unlock write lock failure,the lock is already occupied by another processes write lock"} case -7: return &RedisResult{Code: res, Message: "redis lock write lock failure,the first priority in the current process non-waiting queue"} case -8: diff --git a/distributedlock/luascript/rwlock_script.go b/distributedlock/luascript/rwlock_script.go index ac07244..3af9e27 100644 --- a/distributedlock/luascript/rwlock_script.go +++ b/distributedlock/luascript/rwlock_script.go @@ -60,7 +60,7 @@ end; /* KEYS[1]:锁的键名(key),通常是锁的唯一标识。 KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 -KEYS[3]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。 +KEYS[3]:锁的释放通知写频道(chankey),用于通知其他写等待客户端锁已释放。 ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。 ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 */ @@ -69,10 +69,10 @@ local lockKey = KEYS[2] .. ':' .. ARGV[2]; local mode = redis.call('hget', KEYS[1], 'mode'); if (mode == false) then local writeWait = KEYS[1] .. ':write'; - -- 优先写锁加锁,无写锁的情况通知读锁加锁 + -- 优先写锁加锁 local counter = redis.call('llen',writeWait); if (counter >= 1) then - redis.call('publish', KEYS[4], ARGV[1]); + redis.call('publish', KEYS[3], ARGV[1]); end; return 1; elseif (mode == 'write') then @@ -119,7 +119,7 @@ if (redis.call('hlen', KEYS[1]) > 1) then else redis.call('del', KEYS[1]); local writeWait = KEYS[1] .. ':write'; - -- 优先写锁加锁,无写锁的情况通知读锁加锁 + -- 优先写锁加锁 local counter = redis.call('llen',writeWait); if (counter >= 1) then redis.call('publish', KEYS[3], ARGV[1]); @@ -178,7 +178,8 @@ end; /* KEYS[1]:锁的键名(key),通常是锁的唯一标识。 KEYS[2]:锁的超时键名前缀(rwTimeoutPrefix),用于存储每个读锁的超时键。 -KEYS[3]:锁的释放通知写频道(chankey),用于通知其他客户端锁已释放。 +KEYS[3]:锁的释放通知写频道(writeChankey),用于通知其他写等待客户端锁已释放。 +KEYS[4]:锁的释放通知读频道(readChankey),用于通知其他读等待客户端锁已释放。 ARGV[1]:解锁消息(unlockMessage),用于通知其他客户端锁已释放。 ARGV[2]:当前客户端的唯一标识(token),用于区分不同的客户端。 */ @@ -190,6 +191,8 @@ if (mode == false) then local counter = redis.call('llen',writeWait); if (counter >= 1) then redis.call('publish', KEYS[3], ARGV[1]); + else + redis.call('publish', KEYS[4], ARGV[1]); end; return 1; elseif (mode == 'read') then @@ -204,9 +207,9 @@ else redis.call('del', KEYS[1]); local counter = redis.call('llen',writeWait); if (counter >= 1) then - redis.call('publish', KEYS[4], ARGV[1]); - else redis.call('publish', KEYS[3], ARGV[1]); + else + redis.call('publish', KEYS[4], ARGV[1]); end; return 1; end; diff --git a/distributedlock/redis_lock.go b/distributedlock/redis_lock.go index 0f6a5b9..567160c 100644 --- a/distributedlock/redis_lock.go +++ b/distributedlock/redis_lock.go @@ -100,23 +100,42 @@ func (rl *redissionLocker) Lock(ctx context.Context, timeout ...time.Duration) e return fmt.Errorf("lock the redis lock failed:%w", result) } +// TODO 优化订阅流程 func (rl *redissionLocker) subscribeLock(ctx context.Context, sub *redis.PubSub, out chan struct{}) { if sub == nil || out == nil { return } rl.logger.Info("lock: enter sub routine", zap.String("token", rl.token)) - for { - msg, err := sub.Receive(ctx) - if err != nil { - rl.logger.Info("sub receive message failed", zap.Error(err)) - continue - } + // subCh := sub.Channel() + // for msg := range subCh { + // // 这里只会收到真正的数据消息 + // fmt.Printf("Channel: %s, Payload: %s\n", + // msg.Channel, + // msg.Payload) + // } + receiveChan := make(chan interface{}, 2) + go func() { + for { + msg, err := sub.Receive(ctx) + if err != nil { + if errors.Is(err, redis.ErrClosed) { + return + } + rl.logger.Error("sub receive message failed", zap.Error(err)) + continue + } + rl.logger.Info("sub receive message", zap.Any("msg", msg)) + receiveChan <- msg + } + }() + + for { select { case <-rl.exit: return - default: + case msg := <-receiveChan: switch msg.(type) { case *redis.Subscription: // Ignore. @@ -126,6 +145,8 @@ func (rl *redissionLocker) subscribeLock(ctx context.Context, sub *redis.PubSub, out <- struct{}{} default: } + // case <-subCh: + // out <- struct{}{} } } } diff --git a/distributedlock/redis_rwlock.go b/distributedlock/redis_rwlock.go index 66bfc28..af8e797 100644 --- a/distributedlock/redis_rwlock.go +++ b/distributedlock/redis_rwlock.go @@ -19,6 +19,8 @@ import ( type RedissionRWLocker struct { redissionLocker + writeWaitChanKey string + readWaitChanKey string rwTokenTimeoutPrefix string } @@ -46,7 +48,7 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration subMsg := make(chan struct{}, 1) defer close(subMsg) - sub := rl.client.Subscribe(ctx, rl.waitChanKey) + sub := rl.client.Subscribe(ctx, rl.readWaitChanKey) defer sub.Close() go rl.subscribeLock(ctx, sub, subMsg) @@ -127,7 +129,7 @@ func (rl *RedissionRWLocker) refreshLockTimeout(ctx context.Context) { func (rl *RedissionRWLocker) UnRLock(ctx context.Context) error { rl.logger.Info("unlock RLock by key and token", zap.String("key", rl.key), zap.String("token", rl.token)) - res := rl.client.Eval(ctx, luascript.UnRLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token) + res := rl.client.Eval(ctx, luascript.UnRLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.writeWaitChanKey}, unlockMessage, rl.token) val, err := res.Int() if err != redis.Nil && err != nil { rl.logger.Info("unlock read lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) @@ -174,7 +176,7 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration subMsg := make(chan struct{}, 1) defer close(subMsg) - sub := rl.client.Subscribe(ctx, rl.waitChanKey) + sub := rl.client.Subscribe(ctx, rl.writeWaitChanKey) defer sub.Close() go rl.subscribeLock(ctx, sub, subMsg) @@ -220,7 +222,7 @@ func (rl *RedissionRWLocker) tryWLock(ctx context.Context) error { } func (rl *RedissionRWLocker) UnWLock(ctx context.Context) error { - res := rl.client.Eval(ctx, luascript.UnWLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token) + res := rl.client.Eval(ctx, luascript.UnWLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.writeWaitChanKey, rl.readWaitChanKey}, unlockMessage, rl.token) val, err := res.Int() if err != redis.Nil && err != nil { rl.logger.Error("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err)) @@ -273,7 +275,6 @@ func GetRWLocker(client *redis.Client, conf *RedissionLockConfig) *RedissionRWLo key: strings.Join([]string{conf.Prefix, conf.Key}, ":"), needRefresh: conf.NeedRefresh, lockLeaseTime: conf.LockLeaseTime, - waitChanKey: strings.Join([]string{conf.ChanPrefix, conf.Key, "write"}, ":"), client: client, exit: make(chan struct{}), once: &sync.Once{}, @@ -282,6 +283,8 @@ func GetRWLocker(client *redis.Client, conf *RedissionLockConfig) *RedissionRWLo rwLocker := &RedissionRWLocker{ redissionLocker: *r, + writeWaitChanKey: strings.Join([]string{conf.ChanPrefix, conf.Key, "write"}, ":"), + readWaitChanKey: strings.Join([]string{conf.ChanPrefix, conf.Key, "read"}, ":"), rwTokenTimeoutPrefix: conf.TimeoutPrefix, } return rwLocker diff --git a/distributedlock/rwlock_test.go b/distributedlock/rwlock_test.go index c64cccc..82bc3d2 100644 --- a/distributedlock/rwlock_test.go +++ b/distributedlock/rwlock_test.go @@ -2,7 +2,6 @@ package distributedlock import ( "context" - "fmt" "strings" "testing" "time" @@ -14,20 +13,25 @@ import ( "go.uber.org/zap" ) -var log *zap.Logger +var ( + log *zap.Logger + rdb *redis.Client +) func init() { log = zap.Must(zap.NewDevelopment()) + rdb = redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: "192.168.2.104:30001", + PoolSize: 50, + DialTimeout: 10 * time.Second, + MaxIdleConns: 10, + MaxActiveConns: 40, + }) } func TestRWLockRLockAndUnRLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120, @@ -59,12 +63,6 @@ func TestRWLockRLockAndUnRLock(t *testing.T) { func TestRWLockReentrantRLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120, @@ -113,12 +111,6 @@ func TestRWLockReentrantRLock(t *testing.T) { func TestRWLockRefreshRLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 10, @@ -161,12 +153,6 @@ func TestRWLockRefreshRLock(t *testing.T) { func TestRWLock2ClientRLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120, @@ -226,12 +212,6 @@ func TestRWLock2ClientRLock(t *testing.T) { func TestRWLock2CWith2DifTimeRLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120, @@ -304,12 +284,6 @@ func TestRWLock2CWith2DifTimeRLock(t *testing.T) { func TestRWLock2CWithTimeTransformRLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 30, @@ -376,12 +350,6 @@ func TestRWLock2CWithTimeTransformRLock(t *testing.T) { func TestRWLockWLockAndUnWLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120, @@ -413,12 +381,6 @@ func TestRWLockWLockAndUnWLock(t *testing.T) { func TestRWLockReentrantWLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120, @@ -465,15 +427,8 @@ func TestRWLockReentrantWLock(t *testing.T) { return } -// TODO 设计两个客户端,C1先加读锁与C2后加写锁 -func TestRWLock2CWithRLockAndWLock(t *testing.T) { +func TestRWLock2CWithRLockAndWLockFailed(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120, @@ -501,19 +456,9 @@ func TestRWLock2CWithRLockAndWLock(t *testing.T) { assert.Equal(t, nil, err) assert.Equal(t, 1, num) - // go func() { - // // locker1解写锁 - // time.Sleep(10 * time.Second) - // err = rwLocker1.UnRLock(ctx) - // assert.Equal(t, nil, err) - // }() - // locker2加写锁锁 duration = 10 * time.Second err = rwLocker2.WLock(ctx, duration) - // 预测加写锁失败 - // TODO 优化输出 - fmt.Printf("wlock err:%v\n", err) assert.Equal(t, constant.AcquireTimeoutErr, err) err = rwLocker1.UnRLock(ctx) @@ -523,15 +468,63 @@ func TestRWLock2CWithRLockAndWLock(t *testing.T) { return } +// TODO 设计两个客户端,C1先加读锁成功与C2后加写锁成功 +func TestRWLock2CWithRLockAndWLockSucceed(t *testing.T) { + ctx := context.TODO() + rdb.Conn() + rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ + LockLeaseTime: 120, + NeedRefresh: true, + Key: "component", + Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a", + }) + rwLocker1.logger = log + + rwLocker2 := GetRWLocker(rdb, &RedissionLockConfig{ + LockLeaseTime: 120, + NeedRefresh: true, + Key: "component", + Token: "fd348a84-e07c-4a61-8c19-f753e6bc5577", + }) + rwLocker2.logger = log + duration := 10 * time.Second + // locker1加读锁 + err := rwLocker1.RLock(ctx, duration) + assert.Equal(t, nil, err) + + tokenKey1 := strings.Join([]string{rwLocker1.rwTokenTimeoutPrefix, rwLocker1.token}, ":") + num, err := rdb.HGet(ctx, rwLocker1.key, tokenKey1).Int() + assert.Equal(t, nil, err) + assert.Equal(t, 1, num) + + go func() { + // locker1解写锁 + time.Sleep(10 * time.Second) + err = rwLocker1.UnRLock(ctx) + assert.Equal(t, nil, err) + }() + + // locker2加写锁 + duration = 30 * time.Second + err = rwLocker2.WLock(ctx, duration) + assert.Equal(t, nil, err) + + tokenKey2 := strings.Join([]string{rwLocker2.rwTokenTimeoutPrefix, rwLocker2.token}, ":") + num, err = rdb.HGet(ctx, rwLocker2.key, tokenKey2).Int() + assert.Equal(t, nil, err) + assert.Equal(t, 1, num) + + // locker2解写锁 + err = rwLocker2.UnWLock(ctx) + assert.Equal(t, nil, err) + + t.Log("rwLock 2 client lock test success") + return +} + // TODO 设计两个客户端,C1先加写锁与C2后加读锁 func TestRWLock2CWithWLockAndRLock(t *testing.T) { ctx := context.TODO() - rdb := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: "192.168.2.104:30001", - PoolSize: 50, - DialTimeout: 10 * time.Second, - }) rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ LockLeaseTime: 120,