optimize variable naming and init real time data compute func
This commit is contained in:
parent
984ee3003d
commit
d434a7737d
|
|
@ -46,3 +46,17 @@ func QueryMeasurementByToken(ctx context.Context, tx *gorm.DB, token string) (or
|
||||||
}
|
}
|
||||||
return component, nil
|
return component, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAllMeasurements define func to query all measurement info from postgresDB
|
||||||
|
func GetAllMeasurements(ctx context.Context, tx *gorm.DB) ([]orm.Measurement, error) {
|
||||||
|
var measurements []orm.Measurement
|
||||||
|
|
||||||
|
// ctx超时判断
|
||||||
|
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&measurements)
|
||||||
|
if result.Error != nil {
|
||||||
|
return nil, result.Error
|
||||||
|
}
|
||||||
|
return measurements, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -149,8 +149,8 @@ func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network
|
||||||
return conn.WriteJSON(response)
|
return conn.WriteJSON(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
// processTargetPolling define function to process target in monitor map and data is continuously retrieved from redis based on the target
|
// processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target
|
||||||
func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID string, fanInChan chan network.RealTimePullTarget) {
|
func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) {
|
||||||
// ensure the fanInChan will not leak
|
// ensure the fanInChan will not leak
|
||||||
defer close(fanInChan)
|
defer close(fanInChan)
|
||||||
|
|
||||||
|
|
@ -224,7 +224,7 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s
|
||||||
}
|
}
|
||||||
|
|
||||||
// appendTargets starts new polling goroutines for targets that were just added
|
// appendTargets starts new polling goroutines for targets that were just added
|
||||||
func appendTargets(ctx context.Context, config *RealTimeMonitorConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) {
|
func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) {
|
||||||
targetSet := make(map[string]struct{}, len(appendTargets))
|
targetSet := make(map[string]struct{}, len(appendTargets))
|
||||||
for _, target := range appendTargets {
|
for _, target := range appendTargets {
|
||||||
targetSet[target] = struct{}{}
|
targetSet[target] = struct{}{}
|
||||||
|
|
|
||||||
|
|
@ -19,10 +19,10 @@ import (
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
var globalSubState *SharedMonitorState
|
var globalSubState *SharedSubState
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
globalSubState = NewSharedMonitorState()
|
globalSubState = NewSharedSubState()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RealTimeSubHandler define real time data subscriptions process API
|
// RealTimeSubHandler define real time data subscriptions process API
|
||||||
|
|
@ -204,40 +204,40 @@ func RealTimeSubHandler(c *gin.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RealTimeMonitorComponent define struct of real time subscription component
|
// RealTimeSubComponent define struct of real time subscription component
|
||||||
type RealTimeMonitorComponent struct {
|
type RealTimeSubComponent struct {
|
||||||
targets []string
|
targets []string
|
||||||
targetParam map[string]*orm.Measurement
|
targetParam map[string]*orm.Measurement
|
||||||
}
|
}
|
||||||
|
|
||||||
// RealTimeMonitorConfig define struct of real time subscription config
|
// RealTimeSubConfig define struct of real time subscription config
|
||||||
type RealTimeMonitorConfig struct {
|
type RealTimeSubConfig struct {
|
||||||
noticeChan chan *transportTargets
|
noticeChan chan *transportTargets
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
components map[string]*RealTimeMonitorComponent
|
components map[string]*RealTimeSubComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
// SharedMonitorState define struct of shared subscription state with mutex
|
// SharedSubState define struct of shared subscription state with mutex
|
||||||
type SharedMonitorState struct {
|
type SharedSubState struct {
|
||||||
subMap map[string]*RealTimeMonitorConfig
|
subMap map[string]*RealTimeSubConfig
|
||||||
globalMutex sync.RWMutex
|
globalMutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSharedMonitorState define function to create new SharedMonitorState
|
// NewSharedSubState define function to create new SharedSubState
|
||||||
func NewSharedMonitorState() *SharedMonitorState {
|
func NewSharedSubState() *SharedSubState {
|
||||||
return &SharedMonitorState{
|
return &SharedSubState{
|
||||||
subMap: make(map[string]*RealTimeMonitorConfig),
|
subMap: make(map[string]*RealTimeSubConfig),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// processAndValidateTargets define func to perform all database I/O operations in a lock-free state (eg,ParseDataIdentifierToken)
|
// processAndValidateTargets define func to perform all database I/O operations in a lock-free state (eg,ParseDataIdentifierToken)
|
||||||
func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []network.RealTimeComponentItem, allReqTargetNum int) (
|
func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []network.RealTimeComponentItem, allReqTargetNum int) (
|
||||||
[]network.TargetResult,
|
[]network.TargetResult,
|
||||||
map[string]*RealTimeMonitorComponent,
|
map[string]*RealTimeSubComponent,
|
||||||
[]string,
|
[]string,
|
||||||
) {
|
) {
|
||||||
targetProcessResults := make([]network.TargetResult, 0, allReqTargetNum)
|
targetProcessResults := make([]network.TargetResult, 0, allReqTargetNum)
|
||||||
newComponentsMap := make(map[string]*RealTimeMonitorComponent)
|
newComponentsMap := make(map[string]*RealTimeSubComponent)
|
||||||
successfulTargets := make([]string, 0, allReqTargetNum)
|
successfulTargets := make([]string, 0, allReqTargetNum)
|
||||||
|
|
||||||
for _, componentItem := range components {
|
for _, componentItem := range components {
|
||||||
|
|
@ -260,7 +260,7 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []ne
|
||||||
successfulTargets = append(successfulTargets, target)
|
successfulTargets = append(successfulTargets, target)
|
||||||
|
|
||||||
if _, ok := newComponentsMap[interval]; !ok {
|
if _, ok := newComponentsMap[interval]; !ok {
|
||||||
newComponentsMap[interval] = &RealTimeMonitorComponent{
|
newComponentsMap[interval] = &RealTimeSubComponent{
|
||||||
targets: make([]string, 0, len(componentItem.Targets)),
|
targets: make([]string, 0, len(componentItem.Targets)),
|
||||||
targetParam: make(map[string]*orm.Measurement),
|
targetParam: make(map[string]*orm.Measurement),
|
||||||
}
|
}
|
||||||
|
|
@ -275,7 +275,7 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []ne
|
||||||
}
|
}
|
||||||
|
|
||||||
// mergeComponents define func to merge newComponentsMap into existingComponentsMap
|
// mergeComponents define func to merge newComponentsMap into existingComponentsMap
|
||||||
func mergeComponents(existingComponents map[string]*RealTimeMonitorComponent, newComponents map[string]*RealTimeMonitorComponent) {
|
func mergeComponents(existingComponents map[string]*RealTimeSubComponent, newComponents map[string]*RealTimeSubComponent) {
|
||||||
for interval, newComp := range newComponents {
|
for interval, newComp := range newComponents {
|
||||||
if existingComp, ok := existingComponents[interval]; ok {
|
if existingComp, ok := existingComponents[interval]; ok {
|
||||||
existingComp.targets = append(existingComp.targets, newComp.targets...)
|
existingComp.targets = append(existingComp.targets, newComp.targets...)
|
||||||
|
|
@ -286,8 +286,8 @@ func mergeComponents(existingComponents map[string]*RealTimeMonitorComponent, ne
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateConfig define function to create config in SharedMonitorState
|
// CreateConfig define function to create config in SharedSubState
|
||||||
func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
|
func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
|
||||||
requestTargetsCount := processRealTimeRequestCount(components)
|
requestTargetsCount := processRealTimeRequestCount(components)
|
||||||
targetProcessResults, newComponentsMap, _ := processAndValidateTargets(ctx, tx, components, requestTargetsCount)
|
targetProcessResults, newComponentsMap, _ := processAndValidateTargets(ctx, tx, components, requestTargetsCount)
|
||||||
|
|
||||||
|
|
@ -299,7 +299,7 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clie
|
||||||
return targetProcessResults, err
|
return targetProcessResults, err
|
||||||
}
|
}
|
||||||
|
|
||||||
config := &RealTimeMonitorConfig{
|
config := &RealTimeSubConfig{
|
||||||
noticeChan: make(chan *transportTargets),
|
noticeChan: make(chan *transportTargets),
|
||||||
components: newComponentsMap, // 直接使用预构建的 Map
|
components: newComponentsMap, // 直接使用预构建的 Map
|
||||||
}
|
}
|
||||||
|
|
@ -308,8 +308,8 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clie
|
||||||
return targetProcessResults, nil
|
return targetProcessResults, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendTargets define function to append targets in SharedMonitorState
|
// AppendTargets define function to append targets in SharedSubState
|
||||||
func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
|
func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
|
||||||
requestTargetsCount := processRealTimeRequestCount(components)
|
requestTargetsCount := processRealTimeRequestCount(components)
|
||||||
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
|
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
|
||||||
|
|
||||||
|
|
@ -364,7 +364,7 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, cli
|
||||||
targets := make([]string, 0, len(componentItem.Targets))
|
targets := make([]string, 0, len(componentItem.Targets))
|
||||||
targetParam := make(map[string]*orm.Measurement)
|
targetParam := make(map[string]*orm.Measurement)
|
||||||
targetParam[target] = targetModel.GetMeasurementInfo()
|
targetParam[target] = targetModel.GetMeasurementInfo()
|
||||||
config.components[interval] = &RealTimeMonitorComponent{
|
config.components[interval] = &RealTimeSubComponent{
|
||||||
targets: append(targets, target),
|
targets: append(targets, target),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -380,8 +380,8 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, cli
|
||||||
return targetProcessResults, nil
|
return targetProcessResults, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpsertTargets define function to upsert targets in SharedMonitorState
|
// UpsertTargets define function to upsert targets in SharedSubState
|
||||||
func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
|
func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
|
||||||
requestTargetsCount := processRealTimeRequestCount(components)
|
requestTargetsCount := processRealTimeRequestCount(components)
|
||||||
targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount)
|
targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount)
|
||||||
|
|
||||||
|
|
@ -399,7 +399,7 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, cli
|
||||||
opType = constants.OpAppend
|
opType = constants.OpAppend
|
||||||
s.globalMutex.Lock()
|
s.globalMutex.Lock()
|
||||||
if config, exist = s.subMap[clientID]; !exist {
|
if config, exist = s.subMap[clientID]; !exist {
|
||||||
config = &RealTimeMonitorConfig{
|
config = &RealTimeSubConfig{
|
||||||
noticeChan: make(chan *transportTargets),
|
noticeChan: make(chan *transportTargets),
|
||||||
components: newComponentsMap,
|
components: newComponentsMap,
|
||||||
}
|
}
|
||||||
|
|
@ -423,8 +423,8 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, cli
|
||||||
return targetProcessResults, nil
|
return targetProcessResults, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveTargets define function to remove targets in SharedMonitorState
|
// RemoveTargets define function to remove targets in SharedSubState
|
||||||
func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
|
func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
|
||||||
requestTargetsCount := processRealTimeRequestCount(components)
|
requestTargetsCount := processRealTimeRequestCount(components)
|
||||||
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
|
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
|
||||||
|
|
||||||
|
|
@ -519,8 +519,8 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string,
|
||||||
return targetProcessResults, nil
|
return targetProcessResults, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get define function to get subscriptions config from SharedMonitorState
|
// Get define function to get subscriptions config from SharedSubState
|
||||||
func (s *SharedMonitorState) Get(clientID string) (*RealTimeMonitorConfig, bool) {
|
func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) {
|
||||||
s.globalMutex.RLock()
|
s.globalMutex.RLock()
|
||||||
defer s.globalMutex.RUnlock()
|
defer s.globalMutex.RUnlock()
|
||||||
|
|
||||||
|
|
|
||||||
23
main.go
23
main.go
|
|
@ -26,7 +26,6 @@ import (
|
||||||
"modelRT/router"
|
"modelRT/router"
|
||||||
"modelRT/util"
|
"modelRT/util"
|
||||||
|
|
||||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
swaggerFiles "github.com/swaggo/files"
|
swaggerFiles "github.com/swaggo/files"
|
||||||
|
|
@ -146,19 +145,6 @@ func main() {
|
||||||
}
|
}
|
||||||
defer anchorRealTimePool.Release()
|
defer anchorRealTimePool.Release()
|
||||||
|
|
||||||
// TODO 配置文件中增加 kafka 配置
|
|
||||||
// init cancel context
|
|
||||||
cancelCtx, cancel := context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
customerConf := &kafka.ConfigMap{
|
|
||||||
"bootstrap.servers": modelRTConfig.KafkaConfig.Servers,
|
|
||||||
"group.id": modelRTConfig.KafkaConfig.GroupID,
|
|
||||||
"auto.offset.reset": modelRTConfig.KafkaConfig.AutoOffsetReset,
|
|
||||||
"enable.auto.commit": modelRTConfig.KafkaConfig.EnableAutoCommit,
|
|
||||||
}
|
|
||||||
|
|
||||||
go realtimedata.ReceiveChan(cancelCtx, customerConf, []string{modelRTConfig.KafkaConfig.Topic}, modelRTConfig.KafkaConfig.ReadMessageTimeDuration)
|
|
||||||
|
|
||||||
postgresDBClient.Transaction(func(tx *gorm.DB) error {
|
postgresDBClient.Transaction(func(tx *gorm.DB) error {
|
||||||
// load circuit diagram from postgres
|
// load circuit diagram from postgres
|
||||||
// componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool)
|
// componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool)
|
||||||
|
|
@ -167,7 +153,14 @@ func main() {
|
||||||
// panic(err)
|
// panic(err)
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// TODO 暂时屏蔽完成 swagger 启动测试
|
allMeasurement, err := database.GetAllMeasurements(ctx, tx)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "load topologic info from postgres failed", "error", err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go realtimedata.StartRealTimeDataComputing(ctx, allMeasurement)
|
||||||
|
|
||||||
tree, err := database.QueryTopologicFromDB(ctx, tx)
|
tree, err := database.QueryTopologicFromDB(ctx, tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "load topologic info from postgres failed", "error", err)
|
logger.Error(ctx, "load topologic info from postgres failed", "error", err)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,63 @@
|
||||||
|
// Package realtimedata define real time data operation functions
|
||||||
|
package realtimedata
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// ComputeConfig define struct of measurement computation
|
||||||
|
type ComputeConfig struct {
|
||||||
|
Cause map[string]any
|
||||||
|
Action map[string]any
|
||||||
|
StopGchan chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
|
||||||
|
type MeasComputeState struct {
|
||||||
|
measMap sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMeasComputeState define func to create and returns a new instance of MeasComputeState
|
||||||
|
func NewMeasComputeState() *MeasComputeState {
|
||||||
|
return &MeasComputeState{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store define func to store a compute configuration for the specified key
|
||||||
|
func (m *MeasComputeState) Store(key string, config *ComputeConfig) {
|
||||||
|
m.measMap.Store(key, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load define func to retrieve the compute configuration for the specified key
|
||||||
|
func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) {
|
||||||
|
value, ok := m.measMap.Load(key)
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return value.(*ComputeConfig), true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete define func to remove the compute configuration for the specified key
|
||||||
|
func (m *MeasComputeState) Delete(key string) {
|
||||||
|
m.measMap.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration
|
||||||
|
func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) {
|
||||||
|
value, loaded := m.measMap.LoadOrStore(key, config)
|
||||||
|
return value.(*ComputeConfig), loaded
|
||||||
|
}
|
||||||
|
|
||||||
|
// Range define func to iterate over all key-configuration pairs in the map
|
||||||
|
func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) {
|
||||||
|
m.measMap.Range(func(key, value any) bool {
|
||||||
|
return f(key.(string), value.(*ComputeConfig))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len define func to return the number of compute configurations in the map
|
||||||
|
func (m *MeasComputeState) Len() int {
|
||||||
|
count := 0
|
||||||
|
m.measMap.Range(func(_, _ any) bool {
|
||||||
|
count++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
@ -12,20 +12,44 @@ import (
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
|
"modelRT/orm"
|
||||||
"modelRT/pool"
|
"modelRT/pool"
|
||||||
"modelRT/util"
|
"modelRT/util"
|
||||||
|
|
||||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RealTimeDataChan define channel of real time data receive
|
var (
|
||||||
var RealTimeDataChan chan network.RealTimeDataReceiveRequest
|
// RealTimeDataChan define channel of real time data receive
|
||||||
|
RealTimeDataChan chan network.RealTimeDataReceiveRequest
|
||||||
|
globalComputeState *MeasComputeState
|
||||||
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
|
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
|
||||||
|
globalComputeState = NewMeasComputeState()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReceiveChan define func of real time data receive and process
|
// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info
|
||||||
|
func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) {
|
||||||
|
for _, measurement := range measurements {
|
||||||
|
enableValue, exist := measurement.EventPlan["enable"]
|
||||||
|
enable, ok := enableValue.(bool)
|
||||||
|
if !exist || !enable {
|
||||||
|
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO 启动协程准备查询 redis 数据进行计算
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReceiveChan define func to real time data receive and process
|
||||||
func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) {
|
func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) {
|
||||||
consumer, err := kafka.NewConsumer(consumerConfig)
|
consumer, err := kafka.NewConsumer(consumerConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -48,6 +72,7 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
logger.Info(ctx, "stop real time data computing by context cancel")
|
||||||
return
|
return
|
||||||
case realTimeData := <-RealTimeDataChan:
|
case realTimeData := <-RealTimeDataChan:
|
||||||
componentUUID := realTimeData.PayLoad.ComponentUUID
|
componentUUID := realTimeData.PayLoad.ComponentUUID
|
||||||
|
|
@ -95,7 +120,7 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []
|
||||||
msg, err := consumer.ReadMessage(batchTimeout)
|
msg, err := consumer.ReadMessage(batchTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err.(kafka.Error).Code() == kafka.ErrTimedOut {
|
if err.(kafka.Error).Code() == kafka.ErrTimedOut {
|
||||||
// 超时时处理累积的消息
|
// process accumulated messages when timeout
|
||||||
if len(messages) > 0 {
|
if len(messages) > 0 {
|
||||||
processMessageBatch(ctx, messages)
|
processMessageBatch(ctx, messages)
|
||||||
consumer.Commit()
|
consumer.Commit()
|
||||||
|
|
@ -103,13 +128,12 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
logger.Error(ctx, "read message from kafka failed", "error", err)
|
logger.Error(ctx, "read message from kafka failed", "error", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
messages = append(messages, msg)
|
messages = append(messages, msg)
|
||||||
|
// process messages when batch size or timeout period is reached
|
||||||
// TODO 达到批处理大小或超时时间时处理消息
|
|
||||||
if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout {
|
if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout {
|
||||||
processMessageBatch(ctx, messages)
|
processMessageBatch(ctx, messages)
|
||||||
consumer.Commit()
|
consumer.Commit()
|
||||||
|
|
@ -133,7 +157,7 @@ func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
|
||||||
var realTimeData RealTimeData
|
var realTimeData RealTimeData
|
||||||
err := json.Unmarshal(msgValue, &realTimeData)
|
err := json.Unmarshal(msgValue, &realTimeData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("json unmarshal failed: %w", err)
|
return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
||||||
}
|
}
|
||||||
return &realTimeData, nil
|
return &realTimeData, nil
|
||||||
}
|
}
|
||||||
|
|
@ -194,13 +218,14 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processMessageBatch define func to bathc process kafka message
|
||||||
func processMessageBatch(ctx context.Context, messages []*kafka.Message) {
|
func processMessageBatch(ctx context.Context, messages []*kafka.Message) {
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
realTimeData, err := parseKafkaMessage(msg.Value)
|
realTimeData, err := parseKafkaMessage(msg.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "parse kafka message failed", "error", err)
|
logger.Error(ctx, "parse kafka message failed", "error", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
processRealTimeData(ctx, realTimeData)
|
go processRealTimeData(ctx, realTimeData)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue