From b27b999873ec599c00bbc618173f19a26ad213ca Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 2 Apr 2025 16:47:51 +0800 Subject: [PATCH] add redis read and write lock conflict test of rwlocker --- diagram/hash_test.go | 33 +++++++++ distributedlock/constant/lock_err.go | 6 ++ distributedlock/luascript/rwlock_script.go | 4 +- distributedlock/redis_rwlock.go | 18 ++--- distributedlock/rwlock_test.go | 78 ++++++++++------------ 5 files changed, 86 insertions(+), 53 deletions(-) create mode 100644 diagram/hash_test.go create mode 100644 distributedlock/constant/lock_err.go diff --git a/diagram/hash_test.go b/diagram/hash_test.go new file mode 100644 index 0000000..ed320f3 --- /dev/null +++ b/diagram/hash_test.go @@ -0,0 +1,33 @@ +package diagram + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +func TestHMSet(t *testing.T) { + rdb := redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: "192.168.2.104:6379", + Password: "cnstar", + PoolSize: 50, + DialTimeout: 10 * time.Second, + }) + params := map[string]interface{}{ + "field1": "Hello1", + "field2": "World1", + "field3": 11, + } + + ctx := context.Background() + res, err := rdb.HSet(ctx, "myhash", params).Result() + if err != nil { + fmt.Printf("err:%v\n", err) + } + fmt.Printf("res:%v\n", res) + return +} diff --git a/distributedlock/constant/lock_err.go b/distributedlock/constant/lock_err.go new file mode 100644 index 0000000..fd831f2 --- /dev/null +++ b/distributedlock/constant/lock_err.go @@ -0,0 +1,6 @@ +package constant + +import "errors" + +// AcquireTimeoutErr define error of get lock timeout +var AcquireTimeoutErr = errors.New("the waiting time for obtaining the lock operation has timed out") diff --git a/distributedlock/luascript/rwlock_script.go b/distributedlock/luascript/rwlock_script.go index a103ecc..ac07244 100644 --- a/distributedlock/luascript/rwlock_script.go +++ b/distributedlock/luascript/rwlock_script.go @@ -122,8 +122,6 @@ else -- 优先写锁加锁,无写锁的情况通知读锁加锁 local counter = redis.call('llen',writeWait); if (counter >= 1) then - redis.call('publish', KEYS[4], ARGV[1]); - else redis.call('publish', KEYS[3], ARGV[1]); end; return 1; @@ -157,7 +155,7 @@ if (mode == false) then return 1; elseif (mode == 'read') then -- 放到 list 中等待读锁释放后再次尝试加锁并且订阅读锁释放的消息 - redis.call('rpush', waitkey, ARGV[2]); + redis.call('rpush', waitKey, ARGV[2]); return -3; else -- 可重入写锁逻辑 diff --git a/distributedlock/redis_rwlock.go b/distributedlock/redis_rwlock.go index d873b01..66bfc28 100644 --- a/distributedlock/redis_rwlock.go +++ b/distributedlock/redis_rwlock.go @@ -40,7 +40,7 @@ func (rl *RedissionRWLocker) RLock(ctx context.Context, timeout ...time.Duration go rl.refreshLockTimeout(ctx) }) } - rl.logger.Info("success get the read by key and token", zap.String("key", rl.key), zap.String("token", rl.token)) + rl.logger.Info("success get the read lock by key and token", zap.String("key", rl.key), zap.String("token", rl.token)) return nil } @@ -161,11 +161,14 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration return fmt.Errorf("get write lock failed:%w", result) } - if (result.Code == constant.LockSuccess) && rl.needRefresh { - rl.once.Do(func() { - // async refresh lock timeout unitl receive exit singal - go rl.refreshLockTimeout(ctx) - }) + if result.Code == constant.LockSuccess { + if rl.needRefresh { + rl.once.Do(func() { + // async refresh lock timeout unitl receive exit singal + go rl.refreshLockTimeout(ctx) + }) + } + rl.logger.Info("success get the write lock by key and token", zap.String("key", rl.key), zap.String("token", rl.token)) return nil } @@ -197,9 +200,8 @@ func (rl *RedissionRWLocker) WLock(ctx context.Context, timeout ...time.Duration return nil } case <-acquireTimer.C: - err := errors.New("the waiting time for obtaining the write lock operation has timed out") rl.logger.Info("the waiting time for obtaining the write lock operation has timed out") - return err + return constant.AcquireTimeoutErr } } } diff --git a/distributedlock/rwlock_test.go b/distributedlock/rwlock_test.go index ffc9bee..c64cccc 100644 --- a/distributedlock/rwlock_test.go +++ b/distributedlock/rwlock_test.go @@ -2,10 +2,13 @@ package distributedlock import ( "context" + "fmt" "strings" "testing" "time" + "modelRT/distributedlock/constant" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -21,8 +24,7 @@ func TestRWLockRLockAndUnRLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) @@ -59,8 +61,7 @@ func TestRWLockReentrantRLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) @@ -114,8 +115,7 @@ func TestRWLockRefreshRLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) @@ -163,8 +163,7 @@ func TestRWLock2ClientRLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) @@ -229,8 +228,7 @@ func TestRWLock2CWith2DifTimeRLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) @@ -304,19 +302,17 @@ func TestRWLock2CWith2DifTimeRLock(t *testing.T) { return } -// TODO 设计两个客户端分别加时间不同的读锁,测试ttl时间在有一个key删除后是否可以变换成功 func TestRWLock2CWithTimeTransformRLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) rwLocker1 := GetRWLocker(rdb, &RedissionLockConfig{ - LockLeaseTime: 120, + LockLeaseTime: 30, NeedRefresh: true, Key: "component", Token: "fd348a84-e07c-4a61-8c19-f753e6bc556a", @@ -324,7 +320,7 @@ func TestRWLock2CWithTimeTransformRLock(t *testing.T) { rwLocker1.logger = log rwLocker2 := GetRWLocker(rdb, &RedissionLockConfig{ - LockLeaseTime: 30, + LockLeaseTime: 120, NeedRefresh: true, Key: "component", Token: "fd348a84-e07c-4a61-8c19-f753e6bc5577", @@ -355,27 +351,21 @@ func TestRWLock2CWithTimeTransformRLock(t *testing.T) { hLen := rdb.HLen(ctx, rwLocker1.key).Val() assert.Equal(t, int64(3), hLen) - script := `return redis.call('httl', KEYS[1], 'fields', '1', ARGV[1]);` - result, err := rdb.Eval(ctx, script, []string{rwLocker1.key}, tokenKey1).Result() - assert.Equal(t, nil, err) - ttls, ok := result.([]interface{}) - assert.Equal(t, true, ok) - ttl, ok := ttls[0].(int64) - assert.Equal(t, true, ok) - compareValue := int64(110) - assert.Greater(t, ttl, compareValue) - - // locker1解读锁 - err = rwLocker1.UnRLock(ctx) - assert.Equal(t, nil, err) - - hashTTL := rdb.TTL(ctx, rwLocker1.key).Val().Seconds() - assert.Greater(t, hashTTL, float64(20)) + hashTTL := rdb.TTL(ctx, rwLocker2.key).Val().Seconds() + assert.Greater(t, hashTTL, float64(100)) // locker2解读锁 err = rwLocker2.UnRLock(ctx) assert.Equal(t, nil, err) + time.Sleep(10 * time.Second) + hashTTL = rdb.TTL(ctx, rwLocker1.key).Val().Seconds() + assert.Greater(t, hashTTL, float64(15)) + + // locker1解读锁 + err = rwLocker1.UnRLock(ctx) + assert.Equal(t, nil, err) + err = rdb.Exists(ctx, rwLocker1.key).Err() assert.Equal(t, nil, err) existNum := rdb.Exists(ctx, rwLocker1.key).Val() @@ -388,8 +378,7 @@ func TestRWLockWLockAndUnWLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) @@ -426,8 +415,7 @@ func TestRWLockReentrantWLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) @@ -482,8 +470,7 @@ func TestRWLock2CWithRLockAndWLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, }) @@ -514,13 +501,21 @@ func TestRWLock2CWithRLockAndWLock(t *testing.T) { assert.Equal(t, nil, err) assert.Equal(t, 1, num) + // go func() { + // // locker1解写锁 + // time.Sleep(10 * time.Second) + // err = rwLocker1.UnRLock(ctx) + // assert.Equal(t, nil, err) + // }() + // locker2加写锁锁 - duration = 2 * time.Second + duration = 10 * time.Second err = rwLocker2.WLock(ctx, duration) // 预测加写锁失败 - assert.Equal(t, nil, err) + // TODO 优化输出 + fmt.Printf("wlock err:%v\n", err) + assert.Equal(t, constant.AcquireTimeoutErr, err) - // locker1解写锁 err = rwLocker1.UnRLock(ctx) assert.Equal(t, nil, err) @@ -533,8 +528,7 @@ func TestRWLock2CWithWLockAndRLock(t *testing.T) { ctx := context.TODO() rdb := redis.NewClient(&redis.Options{ Network: "tcp", - Addr: "192.168.2.104:6379", - Password: "cnstar", + Addr: "192.168.2.104:30001", PoolSize: 50, DialTimeout: 10 * time.Second, })