diff --git a/constants/error.go b/constants/error.go index 5cd4291..2fc06e9 100644 --- a/constants/error.go +++ b/constants/error.go @@ -20,7 +20,6 @@ var ( ErrUUIDToCheckT3 = errors.New("in uuid add change type, value of old uuid_to is not empty") ) -// 错误定义 var ( // ErrInvalidAddressType define error of invalid io address type ErrInvalidAddressType = errors.New("invalid address type") @@ -32,5 +31,21 @@ var ( ErrUnsupportedChannelPrefixType = errors.New("unsupported channel prefix") ) +var ( + // ErrFormatUUID define error of format uuid string to uuid.UUID type failed + ErrFormatUUID = errors.New("format string type to uuid.UUID type failed") + // ErrFormatCache define error of format cache with any type to cacheItem type failed + ErrFormatCache = errors.New("format any teype to cache item type failed") +) + // ErrGetClientToken define error of can not get client_token from context var ErrGetClientToken = errors.New("can not get client_token from context") + +// ErrQueryComponentByUUID define error of query component from db by uuid failed +var ErrQueryComponentByUUID = errors.New("query component from db failed by uuid") + +// ErrChanIsNil define error of channel is nil +var ErrChanIsNil = errors.New("this channel is nil") + +// ErrConcurrentModify define error of concurrent modification detected +var ErrConcurrentModify = errors.New("existed concurrent modification risk") diff --git a/real-time-data/cache_manager.go b/real-time-data/cache_manager.go new file mode 100644 index 0000000..3465177 --- /dev/null +++ b/real-time-data/cache_manager.go @@ -0,0 +1,130 @@ +// Package realtimedata define real time data operation functions +package realtimedata + +import ( + "context" + "fmt" + "sync" + "time" + + "modelRT/constants" + "modelRT/database" + "modelRT/logger" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/gofrs/uuid" + "gorm.io/gorm" +) + +// CacheManager define structure for managing cache items +type CacheManager struct { + ctx context.Context + mutex sync.RWMutex + store sync.Map + pool sync.Pool + dbClient *gorm.DB + kafkaClient *kafka.Consumer + ttl time.Duration +} + +// NewCacheManager define func to create new cache manager +func NewCacheManager(ctx context.Context, db *gorm.DB, kf *kafka.Consumer, ttl time.Duration) *CacheManager { + cm := &CacheManager{ + ctx: ctx, + dbClient: db, + kafkaClient: kf, + ttl: ttl, + } + cm.pool.New = func() any { + item := &CacheItem{} + return item + } + return cm +} + +// GetCacheItemFromPool define func to get a cache item from the pool +func (cm *CacheManager) GetCacheItemFromPool() *CacheItem { + item := cm.pool.Get().(*CacheItem) + item.Reset() + return item +} + +// PutCacheItemToPool define func to put a cache item back to the pool +func (cm *CacheManager) PutCacheItemToPool(item *CacheItem) { + if item != nil { + item.Reset() + cm.pool.Put(item) + } +} + +// StoreIntoCache define func to store data into cache, if the key already exists and is not expired, return the existing item +func (cm *CacheManager) StoreIntoCache(key string) (*CacheItem, error) { + if cachedItemRaw, ok := cm.store.Load(key); ok { + cachedItem := cachedItemRaw.(*CacheItem) + if !cachedItem.IsExpired(cm.ttl) { + cachedItem.lastAccessed = time.Now() + return cachedItem, nil + } + cm.PutCacheItemToPool(cachedItem) + cm.store.Delete(key) + } + + uuid, err := uuid.FromString(key) + if err != nil { + logger.Error(cm.ctx, "format key to UUID failed", "key", key, "error", err) + return nil, constants.ErrFormatUUID + } + + componentInfo, err := database.QueryComponentByUUID(cm.ctx, cm.dbClient, uuid) + if err != nil { + logger.Error(nil, "query component from db failed by uuid", "component_uuid", key, "error", err) + return nil, constants.ErrQueryComponentByUUID + } + + newItem := cm.GetCacheItemFromPool() + newItem.key = key + newItem.value = []any{componentInfo.Context} + // newItem.calculatorFunc = dbData.CalculatorFunc + newItem.lastAccessed = time.Now() + + cm.store.Store(key, newItem) + return newItem, nil +} + +// UpdateCacheItem define func to update cache item with new data and trigger new calculation +func (cm *CacheManager) UpdateCacheItem(key string, newData any) error { + cache, existed := cm.store.Load(key) + if !existed { + return nil + } + + cacheItem, ok := cache.(*CacheItem) + if !ok { + return constants.ErrFormatCache + } + + oldValue := cacheItem.value + cacheItem.value = []any{newData} + cacheItem.lastAccessed = time.Now() + + if cacheItem.noticeChan == nil { + fmt.Println("noticeChan is nil, creating new one") + logger.Error(cm.ctx, "", "key", key, "error", "noticeChan is nil, can not notice calculation goroutine") + return constants.ErrChanIsNil + } + + // stop old calculation goroutine + cacheItem.noticeChan <- struct{}{} + cacheItem.noticeChan = make(chan struct{}) + + swapped := cm.store.CompareAndSwap(key, oldValue, cacheItem) + if !swapped { + cacheValue, _ := cm.store.Load(key) + logger.Error(cm.ctx, "store new value into cache failed,existed concurrent modification risk", "key", key, "old_value", oldValue, "cache_value", cacheValue, "store_value", cacheItem.value) + return constants.ErrConcurrentModify + } + + // start new calculation goroutine + go cacheItem.calculatorFunc(cacheItem.noticeChan, cacheItem.value) + return nil +} diff --git a/real-time-data/real_time_cache.go b/real-time-data/real_time_cache.go index eaa3a4b..7f46870 100644 --- a/real-time-data/real_time_cache.go +++ b/real-time-data/real_time_cache.go @@ -2,7 +2,6 @@ package realtimedata import ( - "sync" "time" ) @@ -10,39 +9,21 @@ import ( type CacheItem struct { key string value []any - CalculatorFunc any - LastAccessed time.Time + noticeChan chan struct{} + calculatorFunc func(done chan struct{}, params []any) + lastAccessed time.Time } -// Reset defines a method to reset the CacheItem to its zero state +// Reset defines func to reset the CacheItem to its zero state func (ci *CacheItem) Reset() { ci.key = "" ci.value = nil - ci.CalculatorFunc = nil - ci.LastAccessed = time.Time{} + ci.noticeChan = nil + ci.calculatorFunc = nil + ci.lastAccessed = time.Time{} } -// IsExpired defines a method to check if the cache item has expired based on a given TTL +// IsExpired defines func to check if the cache item has expired based on a given TTL func (ci *CacheItem) IsExpired(ttl time.Duration) bool { - return time.Since(ci.LastAccessed) > ttl -} - -// CachePool define sync.Pool for cache item objects to optimize memory usage and reduce allocations -var CachePool = sync.Pool{ - New: func() any { - return &CacheItem{} - }, -} - -// GetCacheItem define a method to get a cache item from the pool -func GetCacheItem() *CacheItem { - item := CachePool.Get().(*CacheItem) - item.Reset() - return item -} - -// PutCacheItem define a method to put a cache item back to the pool -func PutCacheItem(item *CacheItem) { - item.Reset() - CachePool.Put(item) + return time.Since(ci.lastAccessed) > ttl }