package distributed_lock import ( "context" "errors" "fmt" "strings" "sync" "time" "modelRT/distributedlock/constant" "modelRT/distributedlock/luascript" "modelRT/logger" "github.com/go-redis/redis" uuid "github.com/google/uuid" "go.uber.org/zap" ) type redissionReadLocker struct { redissionLocker rwTimeoutPrefix string prefixKey string needRefresh bool } // TODO 将参数中的 ctx 优化掉 func (rl *redissionReadLocker) Lock(ctx context.Context, timeout ...time.Duration) error { if rl.exit == nil { rl.exit = make(chan struct{}) } resultErr := rl.tryLock().(*constant.RedisError) if resultErr.Code == constant.UnknownInternalError { rl.logger.Error(resultErr.OutputResultMessage()) return fmt.Errorf("get read lock failed:%w", resultErr) } if (resultErr.Code == constant.LockSuccess) && rl.needRefresh { rl.once.Do(func() { // async refresh lock timeout unitl receive exit singal go rl.refreshLockTimeout() }) return nil } var acquireTimer *time.Timer if len(timeout) > 0 && timeout[0] > 0 { acquireTimer = time.NewTimer(timeout[0]) } subMsg := make(chan struct{}, 1) defer close(subMsg) sub := rl.client.Subscribe(rl.waitChankey) defer sub.Close() go rl.subscribeLock(sub, subMsg) if len(timeout) > 0 && timeout[0] > 0 { acquireTimer = time.NewTimer(timeout[0]) for { select { case _, ok := <-subMsg: if !ok { err := errors.New("failed to read the read lock waiting for for the channel message") rl.logger.Error("failed to read the read lock waiting for for the channel message") return err } resultErr := rl.tryLock().(*constant.RedisError) if (resultErr.Code == constant.RLockFailure) || (resultErr.Code == constant.UnknownInternalError) { rl.logger.Info(resultErr.OutputResultMessage()) continue } if resultErr.Code == constant.LockSuccess { rl.logger.Info(resultErr.OutputResultMessage()) return nil } case <-acquireTimer.C: err := errors.New("the waiting time for obtaining the read lock operation has timed out") rl.logger.Info("the waiting time for obtaining the read lock operation has timed out") return err } } } return fmt.Errorf("get read lock failed:%w", constant.NewRedisError(constant.RLockFailure)) } func (rl *redissionReadLocker) tryLock() error { lockType := constant.LockType res := rl.client.Eval(luascript.RLockScript, []string{rl.key, rl.rwTimeoutPrefix}, rl.lockLeaseTime, rl.token) v, err := res.Result() if err != redis.Nil && err != nil { return constant.ConvertResultToErr(constant.UnknownInternalError, lockType, err.Error()) } return constant.ConvertResultToErr(v.(constant.RedisResult), lockType, "") } func (rl *redissionReadLocker) refreshLockTimeout() { rl.logger.Debug("rlock: %s lock %s\n", zap.String("token", rl.token), zap.String("key", rl.key)) lockTime := time.Duration(rl.lockLeaseTime/3) * time.Millisecond timer := time.NewTimer(lockTime) defer timer.Stop() LOOP: for { select { case <-timer.C: timer.Reset(lockTime) // update key expire time res := rl.client.Eval(luascript.RefreshLockScript, []string{rl.key, rl.prefixKey}, rl.lockLeaseTime, rl.token) val, err := res.Int() if err != nil { panic(err) } if val == 0 { rl.logger.Debug("not find the rlock key of self") break LOOP } case <-rl.exit: break LOOP } } rl.logger.Debug("rlock: refresh routine release", zap.String("token", rl.token)) } func (rl *redissionReadLocker) UnLock() { res := rl.client.Eval(luascript.UnRLockScript, []string{rl.key, rl.waitChankey, rl.rwTimeoutPrefix, rl.prefixKey}, unlockMessage, rl.token) val, err := res.Result() if err != redis.Nil && err != nil { panic(err) } if val == nil { panic("attempt to unlock lock, not locked by current routine by lock id:" + rl.token) } rl.logger.Debug("lock: %s unlock %s\n", zap.String("token", rl.token), zap.String("key", rl.key)) if val.(int64) == 1 { rl.cancelRefreshLockTime() } } type redissionWriteLocker struct { redissionLocker } func (rl *redissionWriteLocker) Lock(ctx context.Context, timeout ...time.Duration) { if rl.exit == nil { rl.exit = make(chan struct{}) } ttl, err := rl.tryLock() if err != nil { panic(err) } if ttl <= 0 { rl.once.Do(func() { // async refresh lock timeout unitl receive exit singal go rl.refreshLockTimeout() }) return } submsg := make(chan struct{}, 1) defer close(submsg) sub := rl.client.Subscribe(rl.waitChankey) defer sub.Close() go rl.subscribeLock(sub, submsg) // listen := rl.listenManager.Subscribe(rl.key, rl.token) // defer rl.listenManager.UnSubscribe(rl.key, rl.token) timer := time.NewTimer(ttl) defer timer.Stop() // outimer 理解为如果超过这个时间没有获取到锁,就直接放弃 var outimer *time.Timer if len(timeout) > 0 && timeout[0] > 0 { outimer = time.NewTimer(timeout[0]) } LOOP: for { ttl, err = rl.tryLock() if err != nil { panic(err) } if ttl <= 0 { rl.once.Do(func() { go rl.refreshLockTimeout() }) return } if outimer != nil { select { case _, ok := <-submsg: if !timer.Stop() { <-timer.C } if !ok { panic("lock listen release") } timer.Reset(ttl) case <-ctx.Done(): // break LOOP panic("lock context already release") case <-timer.C: timer.Reset(ttl) case <-outimer.C: if !timer.Stop() { <-timer.C } break LOOP } } else { select { case _, ok := <-submsg: if !timer.Stop() { <-timer.C } if !ok { panic("lock listen release") } timer.Reset(ttl) case <-ctx.Done(): // break LOOP panic("lock context already release") case <-timer.C: timer.Reset(ttl) } } } } func (rl *redissionWriteLocker) tryLock() (time.Duration, error) { res := rl.client.Eval(luascript.WLockScript, []string{rl.key}, rl.lockLeaseTime, rl.token) v, err := res.Result() if err != redis.Nil && err != nil { return 0, err } if v == nil { return 0, nil } return time.Duration(v.(int64)), nil } func (rl *redissionWriteLocker) UnLock() { res := rl.client.Eval(luascript.UnWLockScript, []string{rl.key, rl.waitChankey}, unlockMessage, rl.lockLeaseTime, rl.token) val, err := res.Result() if err != redis.Nil && err != nil { panic(err) } if val == nil { panic("attempt to unlock lock, not locked by current routine by lock id:" + rl.token) } rl.logger.Debug("lock: unlock", zap.String("token", rl.token), zap.String("key", rl.key)) if val.(int64) == 1 { rl.cancelRefreshLockTime() } } func GetReadLocker(client *redis.Client, ops *RedissionLockConfig) *redissionReadLocker { r := &redissionLocker{ token: uuid.New().String(), client: client, exit: make(chan struct{}), once: &sync.Once{}, } if len(ops.Prefix) <= 0 { ops.Prefix = "redission-rwlock" } if len(ops.ChanPrefix) <= 0 { ops.ChanPrefix = "redission-rwlock-channel" } if ops.LockLeaseTime == 0 { r.lockLeaseTime = internalLockLeaseTime } r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":") r.waitChankey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":") tkey := strings.Join([]string{"{", r.key, "}"}, "") return &redissionReadLocker{redissionLocker: *r, rwTimeoutPrefix: strings.Join([]string{tkey, r.token, "rwlock_timeout"}, ":"), prefixKey: tkey, needRefresh: true} } func GetWriteLocker(client *redis.Client, ops *RedissionLockConfig) *redissionWriteLocker { r := &redissionLocker{ token: uuid.New().String(), client: client, exit: make(chan struct{}), once: &sync.Once{}, logger: logger.GetLoggerInstance(), } if len(ops.Prefix) <= 0 { ops.Prefix = "redission-rwlock" } if len(ops.ChanPrefix) <= 0 { ops.ChanPrefix = "redission-rwlock-channel" } if ops.LockLeaseTime == 0 { r.lockLeaseTime = internalLockLeaseTime } r.key = strings.Join([]string{ops.Prefix, ops.Key}, ":") r.waitChankey = strings.Join([]string{ops.ChanPrefix, ops.Key}, ":") return &redissionWriteLocker{redissionLocker: *r} }