optimize cache item of kafka monitor topic
This commit is contained in:
parent
51e8a677ca
commit
7d8c442f9f
|
|
@ -3,13 +3,14 @@ package realtimedata
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"encoding/json"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
"modelRT/network"
|
||||||
|
|
||||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||||
"github.com/gofrs/uuid"
|
"github.com/gofrs/uuid"
|
||||||
|
|
@ -18,22 +19,20 @@ import (
|
||||||
|
|
||||||
// CacheManager define structure for managing cache items
|
// CacheManager define structure for managing cache items
|
||||||
type CacheManager struct {
|
type CacheManager struct {
|
||||||
ctx context.Context
|
mutex sync.RWMutex
|
||||||
mutex sync.RWMutex
|
store sync.Map
|
||||||
store sync.Map
|
pool sync.Pool
|
||||||
pool sync.Pool
|
dbClient *gorm.DB
|
||||||
dbClient *gorm.DB
|
kafkaConsumer *kafka.Consumer
|
||||||
kafkaClient *kafka.Consumer
|
ttl time.Duration
|
||||||
ttl time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCacheManager define func to create new cache manager
|
// NewCacheManager define func to create new cache manager
|
||||||
func NewCacheManager(ctx context.Context, db *gorm.DB, kf *kafka.Consumer, ttl time.Duration) *CacheManager {
|
func NewCacheManager(db *gorm.DB, kf *kafka.Consumer, ttl time.Duration) *CacheManager {
|
||||||
cm := &CacheManager{
|
cm := &CacheManager{
|
||||||
ctx: ctx,
|
dbClient: db,
|
||||||
dbClient: db,
|
kafkaConsumer: kf,
|
||||||
kafkaClient: kf,
|
ttl: ttl,
|
||||||
ttl: ttl,
|
|
||||||
}
|
}
|
||||||
cm.pool.New = func() any {
|
cm.pool.New = func() any {
|
||||||
item := &CacheItem{}
|
item := &CacheItem{}
|
||||||
|
|
@ -42,6 +41,65 @@ func NewCacheManager(ctx context.Context, db *gorm.DB, kf *kafka.Consumer, ttl t
|
||||||
return cm
|
return cm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RealTimeComponentMonitor define func to continuously processing component info and build real time component data monitor from kafka specified topic
|
||||||
|
func (cm *CacheManager) RealTimeComponentMonitor(ctx context.Context, topic string, duration string) {
|
||||||
|
// context for graceful shutdown
|
||||||
|
cancelCtx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
monitorConsumer := cm.kafkaConsumer
|
||||||
|
|
||||||
|
// subscribe the monitor topic
|
||||||
|
err := monitorConsumer.SubscribeTopics([]string{topic}, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "subscribe to the monitor topic failed", "topic", topic, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer monitorConsumer.Close()
|
||||||
|
|
||||||
|
timeoutDuration, err := time.ParseDuration(duration)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "parse duration failed", "duration", duration, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// continuously read messages from kafka
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-cancelCtx.Done():
|
||||||
|
logger.Info(ctx, "context canceled, stopping read loop")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
msg, err := monitorConsumer.ReadMessage(timeoutDuration)
|
||||||
|
if err != nil {
|
||||||
|
if err.(kafka.Error).Code() == kafka.ErrTimedOut {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logger.Error(ctx, "consumer read message failed", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var realTimeData network.RealTimeDataReceiveRequest
|
||||||
|
if err := json.Unmarshal(msg.Value, &realTimeData); err != nil {
|
||||||
|
logger.Error(ctx, "unmarshal kafka message failed", "message", string(msg.Value), "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
key := realTimeData.PayLoad.ComponentUUID
|
||||||
|
_, err = cm.StoreIntoCache(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "store into cache failed", "key", key, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = monitorConsumer.CommitMessage(msg)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "manual submission information failed", "message", msg, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetCacheItemFromPool define func to get a cache item from the pool
|
// GetCacheItemFromPool define func to get a cache item from the pool
|
||||||
func (cm *CacheManager) GetCacheItemFromPool() *CacheItem {
|
func (cm *CacheManager) GetCacheItemFromPool() *CacheItem {
|
||||||
item := cm.pool.Get().(*CacheItem)
|
item := cm.pool.Get().(*CacheItem)
|
||||||
|
|
@ -58,7 +116,10 @@ func (cm *CacheManager) PutCacheItemToPool(item *CacheItem) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreIntoCache define func to store data into cache, if the key already exists and is not expired, return the existing item
|
// StoreIntoCache define func to store data into cache, if the key already exists and is not expired, return the existing item
|
||||||
func (cm *CacheManager) StoreIntoCache(key string) (*CacheItem, error) {
|
func (cm *CacheManager) StoreIntoCache(ctx context.Context, key string) (*CacheItem, error) {
|
||||||
|
cm.mutex.Lock()
|
||||||
|
defer cm.mutex.Unlock()
|
||||||
|
|
||||||
if cachedItemRaw, ok := cm.store.Load(key); ok {
|
if cachedItemRaw, ok := cm.store.Load(key); ok {
|
||||||
cachedItem := cachedItemRaw.(*CacheItem)
|
cachedItem := cachedItemRaw.(*CacheItem)
|
||||||
if !cachedItem.IsExpired(cm.ttl) {
|
if !cachedItem.IsExpired(cm.ttl) {
|
||||||
|
|
@ -71,28 +132,37 @@ func (cm *CacheManager) StoreIntoCache(key string) (*CacheItem, error) {
|
||||||
|
|
||||||
uuid, err := uuid.FromString(key)
|
uuid, err := uuid.FromString(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(cm.ctx, "format key to UUID failed", "key", key, "error", err)
|
logger.Error(ctx, "format key to UUID failed", "key", key, "error", err)
|
||||||
return nil, constants.ErrFormatUUID
|
return nil, constants.ErrFormatUUID
|
||||||
}
|
}
|
||||||
|
|
||||||
componentInfo, err := database.QueryComponentByUUID(cm.ctx, cm.dbClient, uuid)
|
componentInfo, err := database.QueryComponentByUUID(ctx, cm.dbClient, uuid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(nil, "query component from db failed by uuid", "component_uuid", key, "error", err)
|
logger.Error(ctx, "query component from db failed by uuid", "component_uuid", key, "error", err)
|
||||||
return nil, constants.ErrQueryComponentByUUID
|
return nil, constants.ErrQueryComponentByUUID
|
||||||
}
|
}
|
||||||
|
|
||||||
newItem := cm.GetCacheItemFromPool()
|
newItem := cm.GetCacheItemFromPool()
|
||||||
newItem.key = key
|
newItem.key = key
|
||||||
newItem.value = []any{componentInfo.Context}
|
newItem.value = []any{componentInfo.Context}
|
||||||
// newItem.calculatorFunc = dbData.CalculatorFunc
|
// newItem.calculatorFunc = componentInfo.CalculatorFunc
|
||||||
newItem.lastAccessed = time.Now()
|
newItem.lastAccessed = time.Now()
|
||||||
|
|
||||||
|
// 在存储前启动goroutine
|
||||||
|
if newItem.calculatorFunc != nil {
|
||||||
|
newCtx, newCancel := context.WithCancel(ctx)
|
||||||
|
newItem.cancelFunc = newCancel
|
||||||
|
go newItem.calculatorFunc(newCtx, newItem.topic, newItem.value)
|
||||||
|
}
|
||||||
|
|
||||||
cm.store.Store(key, newItem)
|
cm.store.Store(key, newItem)
|
||||||
return newItem, nil
|
return newItem, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateCacheItem define func to update cache item with new data and trigger new calculation
|
// UpdateCacheItem define func to update cache item with new data and trigger new calculation
|
||||||
func (cm *CacheManager) UpdateCacheItem(key string, newData any) error {
|
func (cm *CacheManager) UpdateCacheItem(ctx context.Context, key string, newData any) error {
|
||||||
|
cm.mutex.Lock()
|
||||||
|
defer cm.mutex.Unlock()
|
||||||
|
|
||||||
cache, existed := cm.store.Load(key)
|
cache, existed := cm.store.Load(key)
|
||||||
if !existed {
|
if !existed {
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -103,28 +173,28 @@ func (cm *CacheManager) UpdateCacheItem(key string, newData any) error {
|
||||||
return constants.ErrFormatCache
|
return constants.ErrFormatCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stop old calculation goroutine
|
||||||
|
if cacheItem.cancelFunc != nil {
|
||||||
|
cacheItem.cancelFunc()
|
||||||
|
}
|
||||||
|
|
||||||
oldValue := cacheItem.value
|
oldValue := cacheItem.value
|
||||||
cacheItem.value = []any{newData}
|
cacheItem.value = []any{newData}
|
||||||
cacheItem.lastAccessed = time.Now()
|
cacheItem.lastAccessed = time.Now()
|
||||||
|
|
||||||
if cacheItem.noticeChan == nil {
|
newCtx, newCancel := context.WithCancel(ctx)
|
||||||
fmt.Println("noticeChan is nil, creating new one")
|
cacheItem.cancelFunc = newCancel
|
||||||
logger.Error(cm.ctx, "", "key", key, "error", "noticeChan is nil, can not notice calculation goroutine")
|
|
||||||
return constants.ErrChanIsNil
|
|
||||||
}
|
|
||||||
|
|
||||||
// stop old calculation goroutine
|
|
||||||
cacheItem.noticeChan <- struct{}{}
|
|
||||||
cacheItem.noticeChan = make(chan struct{})
|
|
||||||
|
|
||||||
swapped := cm.store.CompareAndSwap(key, oldValue, cacheItem)
|
swapped := cm.store.CompareAndSwap(key, oldValue, cacheItem)
|
||||||
if !swapped {
|
if !swapped {
|
||||||
cacheValue, _ := cm.store.Load(key)
|
cacheValue, _ := cm.store.Load(key)
|
||||||
logger.Error(cm.ctx, "store new value into cache failed,existed concurrent modification risk", "key", key, "old_value", oldValue, "cache_value", cacheValue, "store_value", cacheItem.value)
|
logger.Error(ctx, "store new value into cache failed,existed concurrent modification risk", "key", key, "old_value", oldValue, "cache_value", cacheValue, "store_value", cacheItem.value)
|
||||||
return constants.ErrConcurrentModify
|
return constants.ErrConcurrentModify
|
||||||
}
|
}
|
||||||
|
|
||||||
// start new calculation goroutine
|
// start new calculation goroutine
|
||||||
go cacheItem.calculatorFunc(cacheItem.noticeChan, cacheItem.value)
|
if cacheItem.calculatorFunc != nil {
|
||||||
|
go cacheItem.calculatorFunc(newCtx, cacheItem.topic, cacheItem.value)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,15 +2,18 @@
|
||||||
package realtimedata
|
package realtimedata
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CacheItem define structure for caching real time data with calculation capabilities
|
// CacheItem define structure for caching real time data with calculation capabilities
|
||||||
type CacheItem struct {
|
type CacheItem struct {
|
||||||
key string
|
key string
|
||||||
value []any
|
value []any
|
||||||
noticeChan chan struct{}
|
// TODO 增加实时数据的topic
|
||||||
calculatorFunc func(done chan struct{}, params []any)
|
topic string
|
||||||
|
cancelFunc context.CancelFunc
|
||||||
|
calculatorFunc func(ctx context.Context, topic string, params []any)
|
||||||
lastAccessed time.Time
|
lastAccessed time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -18,7 +21,10 @@ type CacheItem struct {
|
||||||
func (ci *CacheItem) Reset() {
|
func (ci *CacheItem) Reset() {
|
||||||
ci.key = ""
|
ci.key = ""
|
||||||
ci.value = nil
|
ci.value = nil
|
||||||
ci.noticeChan = nil
|
if ci.cancelFunc != nil {
|
||||||
|
ci.cancelFunc()
|
||||||
|
}
|
||||||
|
ci.cancelFunc = nil
|
||||||
ci.calculatorFunc = nil
|
ci.calculatorFunc = nil
|
||||||
ci.lastAccessed = time.Time{}
|
ci.lastAccessed = time.Time{}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue