// Package realtimedata define real time data operation functions package realtimedata // import ( // "context" // "encoding/json" // "sync" // "time" // "modelRT/constants" // "modelRT/database" // "modelRT/logger" // "modelRT/network" // "github.com/confluentinc/confluent-kafka-go/kafka" // "github.com/gofrs/uuid" // "gorm.io/gorm" // ) // // CacheManager define structure for managing cache items // type CacheManager struct { // mutex sync.RWMutex // store sync.Map // pool sync.Pool // dbClient *gorm.DB // kafkaConsumer *kafka.Consumer // ttl time.Duration // } // // NewCacheManager define func to create new cache manager // func NewCacheManager(db *gorm.DB, kf *kafka.Consumer, ttl time.Duration) *CacheManager { // cm := &CacheManager{ // dbClient: db, // kafkaConsumer: kf, // ttl: ttl, // } // cm.pool.New = func() any { // item := &CacheItem{} // return item // } // return cm // } // // RealTimeComponentMonitor define func to continuously processing component info and build real time component data monitor from kafka specified topic // func (cm *CacheManager) RealTimeComponentMonitor(ctx context.Context, topic string, duration string) { // // context for graceful shutdown // cancelCtx, cancel := context.WithCancel(ctx) // defer cancel() // monitorConsumer := cm.kafkaConsumer // // subscribe the monitor topic // err := monitorConsumer.SubscribeTopics([]string{topic}, nil) // if err != nil { // logger.Error(ctx, "subscribe to the monitor topic failed", "topic", topic, "error", err) // return // } // defer monitorConsumer.Close() // timeoutDuration, err := time.ParseDuration(duration) // if err != nil { // logger.Error(ctx, "parse duration failed", "duration", duration, "error", err) // return // } // // continuously read messages from kafka // for { // select { // case <-cancelCtx.Done(): // logger.Info(ctx, "context canceled, stopping read loop") // return // default: // msg, err := monitorConsumer.ReadMessage(timeoutDuration) // if err != nil { // if err.(kafka.Error).Code() == kafka.ErrTimedOut { // continue // } // logger.Error(ctx, "consumer read message failed", "error", err) // continue // } // var realTimeData network.RealTimeDataReceiveRequest // if err := json.Unmarshal(msg.Value, &realTimeData); err != nil { // logger.Error(ctx, "unmarshal kafka message failed", "message", string(msg.Value), "error", err) // continue // } // key := realTimeData.PayLoad.ComponentUUID // _, err = cm.StoreIntoCache(ctx, key) // if err != nil { // logger.Error(ctx, "store into cache failed", "key", key, "error", err) // continue // } // _, err = monitorConsumer.CommitMessage(msg) // if err != nil { // logger.Error(ctx, "manual submission information failed", "message", msg, "error", err) // } // } // } // } // // GetCacheItemFromPool define func to get a cache item from the pool // func (cm *CacheManager) GetCacheItemFromPool() *CacheItem { // item := cm.pool.Get().(*CacheItem) // item.Reset() // return item // } // // PutCacheItemToPool define func to put a cache item back to the pool // func (cm *CacheManager) PutCacheItemToPool(item *CacheItem) { // if item != nil { // item.Reset() // cm.pool.Put(item) // } // } // // StoreIntoCache define func to store data into cache, if the key already exists and is not expired, return the existing item // func (cm *CacheManager) StoreIntoCache(ctx context.Context, key string) (*CacheItem, error) { // cm.mutex.Lock() // defer cm.mutex.Unlock() // if cachedItemRaw, ok := cm.store.Load(key); ok { // cachedItem := cachedItemRaw.(*CacheItem) // if !cachedItem.IsExpired(cm.ttl) { // cachedItem.lastAccessed = time.Now() // return cachedItem, nil // } // cm.PutCacheItemToPool(cachedItem) // cm.store.Delete(key) // } // uuid, err := uuid.FromString(key) // if err != nil { // logger.Error(ctx, "format key to UUID failed", "key", key, "error", err) // return nil, constants.ErrFormatUUID // } // componentInfo, err := database.QueryComponentByUUID(ctx, cm.dbClient, uuid) // if err != nil { // logger.Error(ctx, "query component from db failed by uuid", "component_uuid", key, "error", err) // return nil, constants.ErrQueryComponentByUUID // } // newItem := cm.GetCacheItemFromPool() // newItem.key = key // newItem.value = []any{componentInfo.Context} // // newItem.calculatorFunc = componentInfo.CalculatorFunc // newItem.lastAccessed = time.Now() // // 在存储前启动goroutine // if newItem.calculatorFunc != nil { // newCtx, newCancel := context.WithCancel(ctx) // newItem.cancelFunc = newCancel // go newItem.calculatorFunc(newCtx, newItem.topic, newItem.value) // } // cm.store.Store(key, newItem) // return newItem, nil // } // // UpdateCacheItem define func to update cache item with new data and trigger new calculation // func (cm *CacheManager) UpdateCacheItem(ctx context.Context, key string, newData any) error { // cm.mutex.Lock() // defer cm.mutex.Unlock() // cache, existed := cm.store.Load(key) // if !existed { // return nil // } // cacheItem, ok := cache.(*CacheItem) // if !ok { // return constants.ErrFormatCache // } // // stop old calculation goroutine // if cacheItem.cancelFunc != nil { // cacheItem.cancelFunc() // } // oldValue := cacheItem.value // cacheItem.value = []any{newData} // cacheItem.lastAccessed = time.Now() // newCtx, newCancel := context.WithCancel(ctx) // cacheItem.cancelFunc = newCancel // swapped := cm.store.CompareAndSwap(key, oldValue, cacheItem) // if !swapped { // cacheValue, _ := cm.store.Load(key) // logger.Error(ctx, "store new value into cache failed,existed concurrent modification risk", "key", key, "old_value", oldValue, "cache_value", cacheValue, "store_value", cacheItem.value) // return constants.ErrConcurrentModify // } // // start new calculation goroutine // if cacheItem.calculatorFunc != nil { // go cacheItem.calculatorFunc(newCtx, cacheItem.topic, cacheItem.value) // } // return nil // }