modelRT/real-time-data/cache_manager.go

131 lines
3.6 KiB
Go

// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"fmt"
"sync"
"time"
"modelRT/constants"
"modelRT/database"
"modelRT/logger"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/gofrs/uuid"
"gorm.io/gorm"
)
// CacheManager define structure for managing cache items
type CacheManager struct {
ctx context.Context
mutex sync.RWMutex
store sync.Map
pool sync.Pool
dbClient *gorm.DB
kafkaClient *kafka.Consumer
ttl time.Duration
}
// NewCacheManager define func to create new cache manager
func NewCacheManager(ctx context.Context, db *gorm.DB, kf *kafka.Consumer, ttl time.Duration) *CacheManager {
cm := &CacheManager{
ctx: ctx,
dbClient: db,
kafkaClient: kf,
ttl: ttl,
}
cm.pool.New = func() any {
item := &CacheItem{}
return item
}
return cm
}
// GetCacheItemFromPool define func to get a cache item from the pool
func (cm *CacheManager) GetCacheItemFromPool() *CacheItem {
item := cm.pool.Get().(*CacheItem)
item.Reset()
return item
}
// PutCacheItemToPool define func to put a cache item back to the pool
func (cm *CacheManager) PutCacheItemToPool(item *CacheItem) {
if item != nil {
item.Reset()
cm.pool.Put(item)
}
}
// StoreIntoCache define func to store data into cache, if the key already exists and is not expired, return the existing item
func (cm *CacheManager) StoreIntoCache(key string) (*CacheItem, error) {
if cachedItemRaw, ok := cm.store.Load(key); ok {
cachedItem := cachedItemRaw.(*CacheItem)
if !cachedItem.IsExpired(cm.ttl) {
cachedItem.lastAccessed = time.Now()
return cachedItem, nil
}
cm.PutCacheItemToPool(cachedItem)
cm.store.Delete(key)
}
uuid, err := uuid.FromString(key)
if err != nil {
logger.Error(cm.ctx, "format key to UUID failed", "key", key, "error", err)
return nil, constants.ErrFormatUUID
}
componentInfo, err := database.QueryComponentByUUID(cm.ctx, cm.dbClient, uuid)
if err != nil {
logger.Error(nil, "query component from db failed by uuid", "component_uuid", key, "error", err)
return nil, constants.ErrQueryComponentByUUID
}
newItem := cm.GetCacheItemFromPool()
newItem.key = key
newItem.value = []any{componentInfo.Context}
// newItem.calculatorFunc = dbData.CalculatorFunc
newItem.lastAccessed = time.Now()
cm.store.Store(key, newItem)
return newItem, nil
}
// UpdateCacheItem define func to update cache item with new data and trigger new calculation
func (cm *CacheManager) UpdateCacheItem(key string, newData any) error {
cache, existed := cm.store.Load(key)
if !existed {
return nil
}
cacheItem, ok := cache.(*CacheItem)
if !ok {
return constants.ErrFormatCache
}
oldValue := cacheItem.value
cacheItem.value = []any{newData}
cacheItem.lastAccessed = time.Now()
if cacheItem.noticeChan == nil {
fmt.Println("noticeChan is nil, creating new one")
logger.Error(cm.ctx, "", "key", key, "error", "noticeChan is nil, can not notice calculation goroutine")
return constants.ErrChanIsNil
}
// stop old calculation goroutine
cacheItem.noticeChan <- struct{}{}
cacheItem.noticeChan = make(chan struct{})
swapped := cm.store.CompareAndSwap(key, oldValue, cacheItem)
if !swapped {
cacheValue, _ := cm.store.Load(key)
logger.Error(cm.ctx, "store new value into cache failed,existed concurrent modification risk", "key", key, "old_value", oldValue, "cache_value", cacheValue, "store_value", cacheItem.value)
return constants.ErrConcurrentModify
}
// start new calculation goroutine
go cacheItem.calculatorFunc(cacheItem.noticeChan, cacheItem.value)
return nil
}