From 7d8c442f9f0c20f606027d1ed45eec9143ec074e Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 19 Sep 2025 16:15:59 +0800 Subject: [PATCH] optimize cache item of kafka monitor topic --- real-time-data/cache_manager.go | 132 +++++++++++++++++++++++------- real-time-data/real_time_cache.go | 16 ++-- 2 files changed, 112 insertions(+), 36 deletions(-) diff --git a/real-time-data/cache_manager.go b/real-time-data/cache_manager.go index 3465177..fc430c7 100644 --- a/real-time-data/cache_manager.go +++ b/real-time-data/cache_manager.go @@ -3,13 +3,14 @@ package realtimedata import ( "context" - "fmt" + "encoding/json" "sync" "time" "modelRT/constants" "modelRT/database" "modelRT/logger" + "modelRT/network" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/gofrs/uuid" @@ -18,22 +19,20 @@ import ( // CacheManager define structure for managing cache items type CacheManager struct { - ctx context.Context - mutex sync.RWMutex - store sync.Map - pool sync.Pool - dbClient *gorm.DB - kafkaClient *kafka.Consumer - ttl time.Duration + mutex sync.RWMutex + store sync.Map + pool sync.Pool + dbClient *gorm.DB + kafkaConsumer *kafka.Consumer + ttl time.Duration } // NewCacheManager define func to create new cache manager -func NewCacheManager(ctx context.Context, db *gorm.DB, kf *kafka.Consumer, ttl time.Duration) *CacheManager { +func NewCacheManager(db *gorm.DB, kf *kafka.Consumer, ttl time.Duration) *CacheManager { cm := &CacheManager{ - ctx: ctx, - dbClient: db, - kafkaClient: kf, - ttl: ttl, + dbClient: db, + kafkaConsumer: kf, + ttl: ttl, } cm.pool.New = func() any { item := &CacheItem{} @@ -42,6 +41,65 @@ func NewCacheManager(ctx context.Context, db *gorm.DB, kf *kafka.Consumer, ttl t return cm } +// RealTimeComponentMonitor define func to continuously processing component info and build real time component data monitor from kafka specified topic +func (cm *CacheManager) RealTimeComponentMonitor(ctx context.Context, topic string, duration string) { + // context for graceful shutdown + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + monitorConsumer := cm.kafkaConsumer + + // subscribe the monitor topic + err := monitorConsumer.SubscribeTopics([]string{topic}, nil) + if err != nil { + logger.Error(ctx, "subscribe to the monitor topic failed", "topic", topic, "error", err) + return + } + defer monitorConsumer.Close() + + timeoutDuration, err := time.ParseDuration(duration) + if err != nil { + logger.Error(ctx, "parse duration failed", "duration", duration, "error", err) + return + } + + // continuously read messages from kafka + for { + select { + case <-cancelCtx.Done(): + logger.Info(ctx, "context canceled, stopping read loop") + return + default: + msg, err := monitorConsumer.ReadMessage(timeoutDuration) + if err != nil { + if err.(kafka.Error).Code() == kafka.ErrTimedOut { + continue + } + logger.Error(ctx, "consumer read message failed", "error", err) + continue + } + + var realTimeData network.RealTimeDataReceiveRequest + if err := json.Unmarshal(msg.Value, &realTimeData); err != nil { + logger.Error(ctx, "unmarshal kafka message failed", "message", string(msg.Value), "error", err) + continue + } + + key := realTimeData.PayLoad.ComponentUUID + _, err = cm.StoreIntoCache(ctx, key) + if err != nil { + logger.Error(ctx, "store into cache failed", "key", key, "error", err) + continue + } + + _, err = monitorConsumer.CommitMessage(msg) + if err != nil { + logger.Error(ctx, "manual submission information failed", "message", msg, "error", err) + } + } + } +} + // GetCacheItemFromPool define func to get a cache item from the pool func (cm *CacheManager) GetCacheItemFromPool() *CacheItem { item := cm.pool.Get().(*CacheItem) @@ -58,7 +116,10 @@ func (cm *CacheManager) PutCacheItemToPool(item *CacheItem) { } // StoreIntoCache define func to store data into cache, if the key already exists and is not expired, return the existing item -func (cm *CacheManager) StoreIntoCache(key string) (*CacheItem, error) { +func (cm *CacheManager) StoreIntoCache(ctx context.Context, key string) (*CacheItem, error) { + cm.mutex.Lock() + defer cm.mutex.Unlock() + if cachedItemRaw, ok := cm.store.Load(key); ok { cachedItem := cachedItemRaw.(*CacheItem) if !cachedItem.IsExpired(cm.ttl) { @@ -71,28 +132,37 @@ func (cm *CacheManager) StoreIntoCache(key string) (*CacheItem, error) { uuid, err := uuid.FromString(key) if err != nil { - logger.Error(cm.ctx, "format key to UUID failed", "key", key, "error", err) + logger.Error(ctx, "format key to UUID failed", "key", key, "error", err) return nil, constants.ErrFormatUUID } - componentInfo, err := database.QueryComponentByUUID(cm.ctx, cm.dbClient, uuid) + componentInfo, err := database.QueryComponentByUUID(ctx, cm.dbClient, uuid) if err != nil { - logger.Error(nil, "query component from db failed by uuid", "component_uuid", key, "error", err) + logger.Error(ctx, "query component from db failed by uuid", "component_uuid", key, "error", err) return nil, constants.ErrQueryComponentByUUID } - newItem := cm.GetCacheItemFromPool() newItem.key = key newItem.value = []any{componentInfo.Context} - // newItem.calculatorFunc = dbData.CalculatorFunc + // newItem.calculatorFunc = componentInfo.CalculatorFunc newItem.lastAccessed = time.Now() + // 在存储前启动goroutine + if newItem.calculatorFunc != nil { + newCtx, newCancel := context.WithCancel(ctx) + newItem.cancelFunc = newCancel + go newItem.calculatorFunc(newCtx, newItem.topic, newItem.value) + } + cm.store.Store(key, newItem) return newItem, nil } // UpdateCacheItem define func to update cache item with new data and trigger new calculation -func (cm *CacheManager) UpdateCacheItem(key string, newData any) error { +func (cm *CacheManager) UpdateCacheItem(ctx context.Context, key string, newData any) error { + cm.mutex.Lock() + defer cm.mutex.Unlock() + cache, existed := cm.store.Load(key) if !existed { return nil @@ -103,28 +173,28 @@ func (cm *CacheManager) UpdateCacheItem(key string, newData any) error { return constants.ErrFormatCache } + // stop old calculation goroutine + if cacheItem.cancelFunc != nil { + cacheItem.cancelFunc() + } + oldValue := cacheItem.value cacheItem.value = []any{newData} cacheItem.lastAccessed = time.Now() - if cacheItem.noticeChan == nil { - fmt.Println("noticeChan is nil, creating new one") - logger.Error(cm.ctx, "", "key", key, "error", "noticeChan is nil, can not notice calculation goroutine") - return constants.ErrChanIsNil - } - - // stop old calculation goroutine - cacheItem.noticeChan <- struct{}{} - cacheItem.noticeChan = make(chan struct{}) + newCtx, newCancel := context.WithCancel(ctx) + cacheItem.cancelFunc = newCancel swapped := cm.store.CompareAndSwap(key, oldValue, cacheItem) if !swapped { cacheValue, _ := cm.store.Load(key) - logger.Error(cm.ctx, "store new value into cache failed,existed concurrent modification risk", "key", key, "old_value", oldValue, "cache_value", cacheValue, "store_value", cacheItem.value) + logger.Error(ctx, "store new value into cache failed,existed concurrent modification risk", "key", key, "old_value", oldValue, "cache_value", cacheValue, "store_value", cacheItem.value) return constants.ErrConcurrentModify } // start new calculation goroutine - go cacheItem.calculatorFunc(cacheItem.noticeChan, cacheItem.value) + if cacheItem.calculatorFunc != nil { + go cacheItem.calculatorFunc(newCtx, cacheItem.topic, cacheItem.value) + } return nil } diff --git a/real-time-data/real_time_cache.go b/real-time-data/real_time_cache.go index 7f46870..b06c6a5 100644 --- a/real-time-data/real_time_cache.go +++ b/real-time-data/real_time_cache.go @@ -2,15 +2,18 @@ package realtimedata import ( + "context" "time" ) // CacheItem define structure for caching real time data with calculation capabilities type CacheItem struct { - key string - value []any - noticeChan chan struct{} - calculatorFunc func(done chan struct{}, params []any) + key string + value []any + // TODO 增加实时数据的topic + topic string + cancelFunc context.CancelFunc + calculatorFunc func(ctx context.Context, topic string, params []any) lastAccessed time.Time } @@ -18,7 +21,10 @@ type CacheItem struct { func (ci *CacheItem) Reset() { ci.key = "" ci.value = nil - ci.noticeChan = nil + if ci.cancelFunc != nil { + ci.cancelFunc() + } + ci.cancelFunc = nil ci.calculatorFunc = nil ci.lastAccessed = time.Time{} }