init redis hash and redis set struct with rwlocker
This commit is contained in:
parent
13809b6a31
commit
3d79993de2
|
|
@ -0,0 +1,89 @@
|
||||||
|
package diagram
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
locker "modelRT/distributedlock"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
// "github.com/go-redis/redis"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO 统一 storageClient与 rwLocker 中使用的 redis 版本
|
||||||
|
// RedisHash defines the encapsulation struct of redis hash type
|
||||||
|
type RedisHash struct {
|
||||||
|
ctx context.Context
|
||||||
|
rwLocker *locker.RedissionRWLocker
|
||||||
|
storageClient *redis.Client
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRedisHashByMap define func of set redis hash by map struct
|
||||||
|
func (rh *RedisHash) SetRedisHashByMap(hashKey string, fields map[string]interface{}) error {
|
||||||
|
err := rh.rwLocker.WLock()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("lock wLock by hashKey failed", zap.String("hashKey", hashKey), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer rh.rwLocker.UnWLock()
|
||||||
|
|
||||||
|
err = rh.storageClient.HSet(rh.ctx, hashKey, fields).Err()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("set hash by map failed", zap.String("hashKey", hashKey), zap.Any("fields", fields), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRedisHashByKV define func of set redis hash by kv struct
|
||||||
|
func (rh *RedisHash) SetRedisHashByKV(hashKey string, field string, value interface{}) error {
|
||||||
|
err := rh.rwLocker.WLock()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("lock wLock by hashKey failed", zap.String("hashKey", hashKey), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer rh.rwLocker.UnWLock()
|
||||||
|
|
||||||
|
err = rh.storageClient.HSet(rh.ctx, hashKey, field, value).Err()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("set hash by kv failed", zap.String("hashKey", hashKey), zap.String("field", field), zap.Any("value", value), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HGet define func of get specified field value from redis hash by key and field name
|
||||||
|
func (rh *RedisHash) HGet(hashKey string, field string) (string, error) {
|
||||||
|
err := rh.rwLocker.RLock()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("lock rLock by hashKey failed", zap.String("hashKey", hashKey), zap.Error(err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer rh.rwLocker.UnRLock()
|
||||||
|
|
||||||
|
result, err := rh.storageClient.HGet(rh.ctx, hashKey, field).Result()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("set hash by kv failed", zap.String("hashKey", hashKey), zap.String("field", field), zap.Error(err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HGetAll define func of get all filelds from redis hash by key
|
||||||
|
func (rh *RedisHash) HGetAll(hashKey string) (map[string]string, error) {
|
||||||
|
err := rh.rwLocker.RLock()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("lock rLock by hashKey failed", zap.String("hashKey", hashKey), zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rh.rwLocker.UnRLock()
|
||||||
|
|
||||||
|
result, err := rh.storageClient.HGetAll(rh.ctx, hashKey).Result()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("get all hash field by hash key failed", zap.String("hashKey", hashKey), zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
package diagram
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
client *redis.Client
|
||||||
|
once sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetClientInstance define func of get redis client instance
|
||||||
|
func GetClientInstance() *redis.Client {
|
||||||
|
once.Do(func() {
|
||||||
|
// TODO 根据配置文件初始化 redis client
|
||||||
|
client = &redis.Client{}
|
||||||
|
})
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,89 @@
|
||||||
|
package diagram
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
locker "modelRT/distributedlock"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO 统一 storageClient与 rwLocker 中使用的 redis 版本
|
||||||
|
// RedisSet defines the encapsulation struct of redis hash type
|
||||||
|
type RedisSet struct {
|
||||||
|
ctx context.Context
|
||||||
|
rwLocker *locker.RedissionRWLocker
|
||||||
|
storageClient *redis.Client
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// SADD define func of add redis set by members
|
||||||
|
func (rs *RedisSet) SADD(setKey string, members ...interface{}) error {
|
||||||
|
err := rs.rwLocker.WLock()
|
||||||
|
if err != nil {
|
||||||
|
rs.logger.Error("lock wLock by setKey failed", zap.String("setKey", setKey), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer rs.rwLocker.UnWLock()
|
||||||
|
|
||||||
|
err = rs.storageClient.SAdd(rs.ctx, setKey, members).Err()
|
||||||
|
if err != nil {
|
||||||
|
rs.logger.Error("add set by memebers failed", zap.String("setKey", setKey), zap.Any("members", members), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SREM define func of remove the specified members from redis set by key
|
||||||
|
func (rh *RedisHash) SREM(setKey string, members ...interface{}) error {
|
||||||
|
err := rh.rwLocker.WLock()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("lock wLock by setKey failed", zap.String("setKey", setKey), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer rh.rwLocker.UnWLock()
|
||||||
|
|
||||||
|
count, err := rh.storageClient.SRem(rh.ctx, setKey, members).Result()
|
||||||
|
if err != nil || count != int64(len(members)) {
|
||||||
|
rh.logger.Error("rem members from set failed", zap.String("setKey", setKey), zap.Any("members", members), zap.Error(err))
|
||||||
|
|
||||||
|
return fmt.Errorf("rem members from set failed:%w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SMembers define func of get all memebers from redis set by key
|
||||||
|
func (rh *RedisHash) SMembers(setKey string) ([]string, error) {
|
||||||
|
err := rh.rwLocker.RLock()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("lock rLock by setKey failed", zap.String("setKey", setKey), zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rh.rwLocker.UnRLock()
|
||||||
|
|
||||||
|
result, err := rh.storageClient.SMembers(rh.ctx, setKey).Result()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("get all hash field by hash key failed", zap.String("setKey", setKey), zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SIsMember define func of determine whether an member is in set by key
|
||||||
|
func (rh *RedisHash) SIsMember(setKey string, member interface{}) (bool, error) {
|
||||||
|
err := rh.rwLocker.RLock()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("lock rLock by setKey failed", zap.String("setKey", setKey), zap.Error(err))
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer rh.rwLocker.UnRLock()
|
||||||
|
|
||||||
|
result, err := rh.storageClient.SIsMember(rh.ctx, setKey, member).Result()
|
||||||
|
if err != nil {
|
||||||
|
rh.logger.Error("get all hash field by hash key failed", zap.String("setKey", setKey), zap.Error(err))
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
@ -220,7 +220,7 @@ func (rl *RedissionRWLocker) UnWLock() error {
|
||||||
res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token)
|
res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.rwTokenTimeoutPrefix, rl.waitChanKey}, unlockMessage, rl.token)
|
||||||
val, err := res.Int()
|
val, err := res.Int()
|
||||||
if err != redis.Nil && err != nil {
|
if err != redis.Nil && err != nil {
|
||||||
rl.logger.Info("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
rl.logger.Error("unlock write lock failed", zap.String("token", rl.token), zap.String("key", rl.key), zap.Error(err))
|
||||||
return fmt.Errorf("unlock write lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnWLockType, err.Error()))
|
return fmt.Errorf("unlock write lock failed:%w", constant.NewRedisResult(constant.UnknownInternalError, constant.UnWLockType, err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ func init() {
|
||||||
func TestRWLockRLockAndUnRLock(t *testing.T) {
|
func TestRWLockRLockAndUnRLock(t *testing.T) {
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Network: "tcp",
|
Network: "tcp",
|
||||||
Addr: "192.168.2.103:6379",
|
Addr: "192.168.2.104:6379",
|
||||||
Password: "cnstar",
|
Password: "cnstar",
|
||||||
PoolSize: 50,
|
PoolSize: 50,
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
|
|
@ -56,7 +56,7 @@ func TestRWLockRLockAndUnRLock(t *testing.T) {
|
||||||
func TestRWLockReentrantRLock(t *testing.T) {
|
func TestRWLockReentrantRLock(t *testing.T) {
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Network: "tcp",
|
Network: "tcp",
|
||||||
Addr: "192.168.2.103:6379",
|
Addr: "192.168.2.104:6379",
|
||||||
Password: "cnstar",
|
Password: "cnstar",
|
||||||
PoolSize: 50,
|
PoolSize: 50,
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
|
|
@ -110,7 +110,7 @@ func TestRWLockReentrantRLock(t *testing.T) {
|
||||||
func TestRWLockRefreshRLock(t *testing.T) {
|
func TestRWLockRefreshRLock(t *testing.T) {
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Network: "tcp",
|
Network: "tcp",
|
||||||
Addr: "192.168.2.103:6379",
|
Addr: "192.168.2.104:6379",
|
||||||
Password: "cnstar",
|
Password: "cnstar",
|
||||||
PoolSize: 50,
|
PoolSize: 50,
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
|
|
@ -159,7 +159,7 @@ func TestRWLockRefreshRLock(t *testing.T) {
|
||||||
func TestRWLock2ClientRLock(t *testing.T) {
|
func TestRWLock2ClientRLock(t *testing.T) {
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Network: "tcp",
|
Network: "tcp",
|
||||||
Addr: "192.168.2.103:6379",
|
Addr: "192.168.2.104:6379",
|
||||||
Password: "cnstar",
|
Password: "cnstar",
|
||||||
PoolSize: 50,
|
PoolSize: 50,
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
|
|
@ -225,7 +225,7 @@ func TestRWLock2ClientRLock(t *testing.T) {
|
||||||
func TestRWLock2CWith2DifTimeRLock(t *testing.T) {
|
func TestRWLock2CWith2DifTimeRLock(t *testing.T) {
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Network: "tcp",
|
Network: "tcp",
|
||||||
Addr: "192.168.2.103:6379",
|
Addr: "192.168.2.104:6379",
|
||||||
Password: "cnstar",
|
Password: "cnstar",
|
||||||
PoolSize: 50,
|
PoolSize: 50,
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
|
|
@ -290,7 +290,7 @@ func TestRWLock2CWith2DifTimeRLock(t *testing.T) {
|
||||||
func TestRWLockWLockAndUnWLock(t *testing.T) {
|
func TestRWLockWLockAndUnWLock(t *testing.T) {
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Network: "tcp",
|
Network: "tcp",
|
||||||
Addr: "192.168.2.103:6379",
|
Addr: "192.168.2.104:6379",
|
||||||
Password: "cnstar",
|
Password: "cnstar",
|
||||||
PoolSize: 50,
|
PoolSize: 50,
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
|
|
@ -327,7 +327,7 @@ func TestRWLockWLockAndUnWLock(t *testing.T) {
|
||||||
func TestRWLockReentrantWLock(t *testing.T) {
|
func TestRWLockReentrantWLock(t *testing.T) {
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Network: "tcp",
|
Network: "tcp",
|
||||||
Addr: "192.168.2.103:6379",
|
Addr: "192.168.2.104:6379",
|
||||||
Password: "cnstar",
|
Password: "cnstar",
|
||||||
PoolSize: 50,
|
PoolSize: 50,
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
|
|
|
||||||
3
go.mod
3
go.mod
|
|
@ -13,6 +13,7 @@ require (
|
||||||
github.com/json-iterator/go v1.1.12
|
github.com/json-iterator/go v1.1.12
|
||||||
github.com/natefinch/lumberjack v2.0.0+incompatible
|
github.com/natefinch/lumberjack v2.0.0+incompatible
|
||||||
github.com/panjf2000/ants/v2 v2.10.0
|
github.com/panjf2000/ants/v2 v2.10.0
|
||||||
|
github.com/redis/go-redis/v9 v9.7.3
|
||||||
github.com/spf13/viper v1.19.0
|
github.com/spf13/viper v1.19.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
github.com/swaggo/files v1.0.1
|
github.com/swaggo/files v1.0.1
|
||||||
|
|
@ -29,9 +30,11 @@ require (
|
||||||
github.com/KyleBanks/depth v1.2.1 // indirect
|
github.com/KyleBanks/depth v1.2.1 // indirect
|
||||||
github.com/bytedance/sonic v1.12.5 // indirect
|
github.com/bytedance/sonic v1.12.5 // indirect
|
||||||
github.com/bytedance/sonic/loader v0.2.1 // indirect
|
github.com/bytedance/sonic/loader v0.2.1 // indirect
|
||||||
|
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||||
github.com/cloudwego/base64x v0.1.4 // indirect
|
github.com/cloudwego/base64x v0.1.4 // indirect
|
||||||
github.com/cloudwego/iasm v0.2.0 // indirect
|
github.com/cloudwego/iasm v0.2.0 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||||
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
|
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
|
||||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue