diff --git a/constants/buffer.go b/constants/buffer.go new file mode 100644 index 0000000..e8079f8 --- /dev/null +++ b/constants/buffer.go @@ -0,0 +1,11 @@ +// Package constants define constant variable +package constants + +import "time" + +const ( + // SendMaxBatchSize define maximum buffer capacity + SendMaxBatchSize = 100 + // SendMaxBatchInterval define maximum aggregate latency + SendMaxBatchInterval = 200 * time.Millisecond +) diff --git a/handler/alert_event_query.go b/handler/alert_event_query.go index 2365a1b..58e1613 100644 --- a/handler/alert_event_query.go +++ b/handler/alert_event_query.go @@ -35,7 +35,7 @@ func QueryAlertEventHandler(c *gin.Context) { resp := network.SuccessResponse{ Code: 0, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]any{ "events": events, }, } diff --git a/handler/anchor_point_replace.go b/handler/anchor_point_replace.go index 272a697..877c877 100644 --- a/handler/anchor_point_replace.go +++ b/handler/anchor_point_replace.go @@ -68,7 +68,7 @@ func ComponentAnchorReplaceHandler(c *gin.Context) { resp := network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": request.UUID, }, } diff --git a/handler/attr_delete.go b/handler/attr_delete.go index 812acba..b78e46c 100644 --- a/handler/attr_delete.go +++ b/handler/attr_delete.go @@ -41,7 +41,7 @@ func AttrDeleteHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{"attr_token": request.AttrToken}, + Payload: map[string]interface{}{"attr_token": request.AttrToken}, }) return } @@ -49,7 +49,7 @@ func AttrDeleteHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "attr_token": request.AttrToken, }, }) diff --git a/handler/attr_load.go b/handler/attr_load.go index b669120..3a50be6 100644 --- a/handler/attr_load.go +++ b/handler/attr_load.go @@ -46,7 +46,7 @@ func AttrGetHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{"attr_token": request.AttrToken}, + Payload: map[string]interface{}{"attr_token": request.AttrToken}, }) return } @@ -59,7 +59,7 @@ func AttrGetHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "attr_token": request.AttrToken, "attr_value": attrValue, }, diff --git a/handler/attr_update.go b/handler/attr_update.go index 9ecc24c..589164e 100644 --- a/handler/attr_update.go +++ b/handler/attr_update.go @@ -43,7 +43,7 @@ func AttrSetHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{"attr_token": request.AttrToken}, + Payload: map[string]interface{}{"attr_token": request.AttrToken}, }) return } @@ -51,7 +51,7 @@ func AttrSetHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "attr_token": request.AttrToken, }, }) diff --git a/handler/circuit_diagram_create.go b/handler/circuit_diagram_create.go index cf61aa6..536151d 100644 --- a/handler/circuit_diagram_create.go +++ b/handler/circuit_diagram_create.go @@ -37,7 +37,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": request.PageID, }, } @@ -65,7 +65,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "topologic_info": topologicLink, }, } @@ -89,7 +89,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "topologic_infos": topologicCreateInfos, }, } @@ -111,7 +111,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "component_infos": request.ComponentInfos, }, } @@ -130,7 +130,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": info.UUID, "component_params": info.Params, }, @@ -152,7 +152,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { resp := network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": request.PageID, }, } diff --git a/handler/circuit_diagram_delete.go b/handler/circuit_diagram_delete.go index aa60f15..a691679 100644 --- a/handler/circuit_diagram_delete.go +++ b/handler/circuit_diagram_delete.go @@ -42,7 +42,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": request.PageID, }, } @@ -70,7 +70,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "topologic_info": topologicLink, }, } @@ -95,7 +95,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "topologic_info": topologicDelInfo, }, } @@ -112,7 +112,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "topologic_info": topologicDelInfo, }, } @@ -138,7 +138,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": componentInfo.UUID, }, } @@ -162,7 +162,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": componentInfo.UUID, }, } @@ -184,7 +184,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": componentInfo.UUID, }, } @@ -205,7 +205,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { resp := network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": request.PageID, }, } diff --git a/handler/circuit_diagram_load.go b/handler/circuit_diagram_load.go index 932d792..8f8c71c 100644 --- a/handler/circuit_diagram_load.go +++ b/handler/circuit_diagram_load.go @@ -33,7 +33,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": pageID, }, } @@ -48,16 +48,16 @@ func CircuitDiagramLoadHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": pageID, }, } c.JSON(http.StatusOK, resp) return } - payLoad := make(map[string]interface{}) - payLoad["root_vertex"] = topologicInfo.RootVertex - payLoad["topologic"] = topologicInfo.VerticeLinks + payload := make(map[string]interface{}) + payload["root_vertex"] = topologicInfo.RootVertex + payload["topologic"] = topologicInfo.VerticeLinks componentParamMap := make(map[string]any) for _, VerticeLink := range topologicInfo.VerticeLinks { @@ -69,7 +69,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": componentUUID, }, } @@ -84,7 +84,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": componentUUID, }, } @@ -103,7 +103,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": topologicInfo.RootVertex, }, } @@ -118,7 +118,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": rootVertexUUID, }, } @@ -127,12 +127,12 @@ func CircuitDiagramLoadHandler(c *gin.Context) { } componentParamMap[rootVertexUUID] = rootComponentParam - payLoad["component_params"] = componentParamMap + payload["component_params"] = componentParamMap resp := network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: payLoad, + Payload: payload, } c.JSON(http.StatusOK, resp) } diff --git a/handler/circuit_diagram_update.go b/handler/circuit_diagram_update.go index db1900f..d0181e1 100644 --- a/handler/circuit_diagram_update.go +++ b/handler/circuit_diagram_update.go @@ -35,7 +35,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": request.PageID, }, } @@ -52,7 +52,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "topologic_info": topologicLink, }, } @@ -75,7 +75,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "topologic_info": topologicChangeInfo, }, } @@ -92,7 +92,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "topologic_info": topologicChangeInfo, }, } @@ -109,7 +109,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": request.PageID, "component_info": request.ComponentInfos, }, @@ -129,7 +129,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { resp := network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "uuid": info.UUID, "component_params": info.Params, }, @@ -152,7 +152,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { resp := network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "page_id": request.PageID, }, } diff --git a/handler/history_data_query.go b/handler/history_data_query.go index 3c333b7..294f4b9 100644 --- a/handler/history_data_query.go +++ b/handler/history_data_query.go @@ -50,7 +50,7 @@ func QueryHistoryDataHandler(c *gin.Context) { resp := network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "events": events, }, } diff --git a/handler/measurement_load.go b/handler/measurement_load.go index 3d1b718..f90d362 100644 --- a/handler/measurement_load.go +++ b/handler/measurement_load.go @@ -45,7 +45,7 @@ func MeasurementGetHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusInternalServerError, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "measurement_id": request.MeasurementID, "measurement_token": request.MeasurementToken, }, @@ -60,7 +60,7 @@ func MeasurementGetHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "measurement_id": request.MeasurementID, "measurement_token": request.MeasurementToken, "measurement_value": points, @@ -72,7 +72,7 @@ func MeasurementGetHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: map[string]interface{}{ + Payload: map[string]interface{}{ "measurement_id": request.MeasurementID, "measurement_token": request.MeasurementToken, "measurement_info": measurementInfo, diff --git a/handler/measurement_recommend.go b/handler/measurement_recommend.go index 6e5a31e..a86e783 100644 --- a/handler/measurement_recommend.go +++ b/handler/measurement_recommend.go @@ -17,7 +17,7 @@ import ( // @Tags Measurement Recommend // @Accept json // @Produce json -// @Param request body network.MeasurementRecommendRequest true "查询输入参数,例如 'trans' 或 'transformfeeder1_220.'" +// @Param。 input query string true "推荐关键词,例如 'trans' 或 'transformfeeder1_220.'" Example("trans") // @Success 200 {object} network.SuccessResponse{payload=network.MeasurementRecommendPayload} "返回推荐列表成功" // // @Example 200 { @@ -45,8 +45,8 @@ import ( func MeasurementRecommendHandler(c *gin.Context) { var request network.MeasurementRecommendRequest - if err := c.ShouldBindJSON(&request); err != nil { - logger.Error(c, "failed to unmarshal measurement recommend request", "error", err) + if err := c.ShouldBindQuery(&request); err != nil { + logger.Error(c, "failed to bind measurement recommend request", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), @@ -60,7 +60,7 @@ func MeasurementRecommendHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusInternalServerError, Msg: err.Error(), - PayLoad: map[string]any{ + Payload: map[string]any{ "input": request.Input, }, }) @@ -104,7 +104,7 @@ func MeasurementRecommendHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: &network.MeasurementRecommendPayload{ + Payload: &network.MeasurementRecommendPayload{ Input: request.Input, Offset: finalOffset, RecommendedList: resultRecommends, diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index c95e046..21dcc2b 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -60,24 +60,51 @@ func PullRealTimeDataHandler(c *gin.Context) { // TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1 fanInChan := make(chan network.RealTimePullTarget, 10000) go processTargetPolling(ctx, globalMonitorState, clientID, fanInChan) - go readClientMessages(ctx, conn, clientID, cancel) + bufferMaxSize := constants.SendMaxBatchSize + sendMaxInterval := constants.SendMaxBatchInterval + buffer := make([]network.RealTimePullTarget, 0, bufferMaxSize) + ticker := time.NewTicker(sendMaxInterval) + defer ticker.Stop() + for { select { case targetData, ok := <-fanInChan: if !ok { - logger.Error(ctx, "fanInChan closed unexpectedly", "clientID", clientID) + logger.Error(ctx, "fanInChan closed unexpectedly", "client_id", clientID) return } - // TODO 考虑后续将多个 targets 聚合发送 - err := sendRealTimeDataStream(conn, targetData) - if err != nil { - logger.Error(nil, "clientID: %s 写入数据失败: %v", clientID, err) + buffer = append(buffer, targetData) + + if len(buffer) >= bufferMaxSize { + // buffer is full, send immediately + if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { + logger.Error(nil, "when buffer is full, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) + return + } + // reset buffer + buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize) + // reset the ticker to prevent it from triggering immediately after the ticker is sent + ticker.Reset(sendMaxInterval) } - default: - // TODO[BACKPRESSURE-ISSUE] 考虑在此使用双重背压方式解决阻塞问题 #1 - logger.Warn(ctx, "Write channel full, dropping data for slow client.") + case <-ticker.C: + if len(buffer) > 0 { + // when the ticker is triggered, all data in the send buffer is sent + if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { + logger.Error(nil, "when the ticker is triggered, send the real time aggregate data failed", "client_id", clientID, "buffer", buffer, "error", err) + return + } + // reset buffer + buffer = make([]network.RealTimePullTarget, 0, bufferMaxSize) + } + case <-ctx.Done(): + // send the last remaining data + if err := sendAggregateRealTimeDataStream(conn, buffer); err != nil { + logger.Error(nil, "send the last remaining data failed", "client_id", clientID, "buffer", buffer, "error", err) + } + logger.Info(ctx, "PullRealTimeDataHandler exiting as context is done.", "client_id", clientID) + return } } } @@ -107,13 +134,16 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri } } -// sendRealTimeDataStream define func to responsible for continuously pushing real-time data to the client -func sendRealTimeDataStream(conn *websocket.Conn, targetData network.RealTimePullTarget) error { +// sendAggregateRealTimeDataStream define func to responsible for continuously pushing aggregate real-time data to the client +func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network.RealTimePullTarget) error { + if len(targetsData) == 0 { + return nil + } response := network.SuccessResponse{ Code: 200, Msg: "success", - PayLoad: network.RealTimePullPayload{ - Targets: []network.RealTimePullTarget{targetData}, + Payload: network.RealTimePullPayload{ + Targets: targetsData, }, } return conn.WriteJSON(response) diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index 2b147df..007c70d 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -122,7 +122,7 @@ func RealTimeSubHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: network.RealTimeSubPayload{ + Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, @@ -133,7 +133,7 @@ func RealTimeSubHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: network.RealTimeSubPayload{ + Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, @@ -146,7 +146,7 @@ func RealTimeSubHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: network.RealTimeSubPayload{ + Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, @@ -157,7 +157,7 @@ func RealTimeSubHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: network.RealTimeSubPayload{ + Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, @@ -170,7 +170,7 @@ func RealTimeSubHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: network.RealTimeSubPayload{ + Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, @@ -181,7 +181,7 @@ func RealTimeSubHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: network.RealTimeSubPayload{ + Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, @@ -195,7 +195,7 @@ func RealTimeSubHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), - PayLoad: network.RealTimeSubPayload{ + Payload: network.RealTimeSubPayload{ ClientID: clientID, TargetResults: results, }, diff --git a/main.go b/main.go index 4ac3ac3..b9872e0 100644 --- a/main.go +++ b/main.go @@ -22,11 +22,11 @@ import ( "modelRT/middleware" "modelRT/model" "modelRT/pool" + realtimedata "modelRT/real-time-data" "modelRT/router" "modelRT/util" - realtimedata "modelRT/real-time-data" - + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/gin-gonic/gin" "github.com/panjf2000/ants/v2" swaggerFiles "github.com/swaggo/files" @@ -146,11 +146,18 @@ func main() { } defer anchorRealTimePool.Release() + // TODO 配置文件中增加 kafka 配置 // init cancel context cancelCtx, cancel := context.WithCancel(ctx) defer cancel() - // init real time data receive channel - go realtimedata.ReceiveChan(cancelCtx) + customerConf := &kafka.ConfigMap{ + "bootstrap.servers": modelRTConfig.KafkaConfig.Servers, + "group.id": modelRTConfig.KafkaConfig.GroupID, + "auto.offset.reset": modelRTConfig.KafkaConfig.AutoOffsetReset, + "enable.auto.commit": modelRTConfig.KafkaConfig.EnableAutoCommit, + } + + go realtimedata.ReceiveChan(cancelCtx, customerConf, []string{modelRTConfig.KafkaConfig.Topic}, modelRTConfig.KafkaConfig.ReadMessageTimeDuration) postgresDBClient.Transaction(func(tx *gorm.DB) error { // load circuit diagram from postgres @@ -170,10 +177,6 @@ func main() { return nil }) - // TODO 完成订阅数据分析 - // TODO 暂时屏蔽完成 swagger 启动测试 - // go realtimedata.RealTimeDataComputer(ctx, nil, []string{}, "") - // use release mode in productio // gin.SetMode(gin.ReleaseMode) engine := gin.New() diff --git a/network/measurement_request.go b/network/measurement_request.go index f69b86e..a687d72 100644 --- a/network/measurement_request.go +++ b/network/measurement_request.go @@ -9,5 +9,5 @@ type MeasurementGetRequest struct { // MeasurementRecommendRequest defines the request payload for an measurement recommend type MeasurementRecommendRequest struct { - Input string `json:"input" example:"trans"` + Input string `form:"input,omitempty" example:"trans"` } diff --git a/network/response.go b/network/response.go index ea24226..6415919 100644 --- a/network/response.go +++ b/network/response.go @@ -5,14 +5,14 @@ package network type FailureResponse struct { Code int `json:"code" example:"500"` Msg string `json:"msg" example:"failed to get recommend data from redis"` - PayLoad any `json:"payload" swaggertype:"object"` + Payload any `json:"payload" swaggertype:"object"` } // SuccessResponse define struct of standard successful API response format type SuccessResponse struct { Code int `json:"code" example:"200"` Msg string `json:"msg" example:"success"` - PayLoad any `json:"payload" swaggertype:"object"` + Payload any `json:"payload" swaggertype:"object"` } // MeasurementRecommendPayload define struct of represents the data payload for the successful recommendation response. diff --git a/real-time-data/kafka.go b/real-time-data/kafka.go deleted file mode 100644 index d6b857c..0000000 --- a/real-time-data/kafka.go +++ /dev/null @@ -1,63 +0,0 @@ -// Package realtimedata define real time data operation functions -package realtimedata - -import ( - "context" - "time" - - "modelRT/logger" - - "github.com/confluentinc/confluent-kafka-go/kafka" -) - -// RealTimeDataComputer continuously processing real-time data from Kafka specified topics -func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, topics []string, duration string) { - // context for graceful shutdown - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // setup a channel to listen for interrupt signals - // TODO 将中断信号放到入参中 - interrupt := make(chan struct{}, 1) - - // read message (-1 means wait indefinitely) - timeoutDuration, err := time.ParseDuration(duration) - - // create a new consumer - consumer, err := kafka.NewConsumer(&consumerConfig) - if err != nil { - logger.Error(ctx, "init kafka consume by config failed", "config", consumerConfig, "error", err) - } - - // subscribe to the topic - err = consumer.SubscribeTopics(topics, nil) - if err != nil { - logger.Error(ctx, "subscribe to the topic failed", "topic", topics, "error", err) - } - - // start a goroutine to handle shutdown - go func() { - <-interrupt - cancel() - consumer.Close() - }() - - // continuously read messages from Kafka - for { - msg, err := consumer.ReadMessage(timeoutDuration) - if err != nil { - if ctx.Err() == context.Canceled { - logger.Info(ctx, "context canceled, stopping read loop") - break - } - logger.Error(ctx, "consumer read message failed", "error", err) - continue - } - - // TODO 使用 ants.pool处理 kafka 的订阅数据 - _, err = consumer.CommitMessage(msg) - if err != nil { - logger.Error(ctx, "manual submission information failed", "message", msg, "error", err) - } - } -} diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go new file mode 100644 index 0000000..c345253 --- /dev/null +++ b/real-time-data/real_time_data_computing.go @@ -0,0 +1,208 @@ +// Package realtimedata define real time data operation functions +package realtimedata + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "modelRT/config" + "modelRT/constants" + "modelRT/diagram" + "modelRT/logger" + "modelRT/network" + "modelRT/pool" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +// RealTimeDataChan define channel of real time data receive +var RealTimeDataChan chan network.RealTimeDataReceiveRequest + +func init() { + RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100) +} + +// ReceiveChan define func of real time data receive and process +func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration string) { + fmt.Println(topics, duration) + consumer, err := kafka.NewConsumer(consumerConfig) + if err != nil { + logger.Error(ctx, "create kafka consumer failed", "error", err) + return + } + defer consumer.Close() + + err = consumer.SubscribeTopics(topics, nil) + if err != nil { + logger.Error(ctx, "subscribe kafka topics failed", "topic", topics, "error", err) + return + } + + logger.Info(ctx, "start consuming from kafka", "topic", topics) + + batchSize := 100 + batchTimeout := 2 * time.Second + messages := make([]*kafka.Message, 0, batchSize) + lastCommit := time.Now() + + for { + select { + case <-ctx.Done(): + return + case realTimeData := <-RealTimeDataChan: + componentUUID := realTimeData.PayLoad.ComponentUUID + component, err := diagram.GetComponentMap(componentUUID) + if err != nil { + logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err) + continue + } + + componentType := component.Type + if componentType != constants.DemoType { + logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID) + continue + } + + var anchorName string + var compareValUpperLimit, compareValLowerLimit float64 + var anchorRealTimeData []float64 + var calculateFunc func(archorValue float64, args ...float64) float64 + + // calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData) + + for _, param := range realTimeData.PayLoad.Values { + anchorRealTimeData = append(anchorRealTimeData, param.Value) + } + + anchorConfig := config.AnchorParamConfig{ + AnchorParamBaseConfig: config.AnchorParamBaseConfig{ + ComponentUUID: componentUUID, + AnchorName: anchorName, + CompareValUpperLimit: compareValUpperLimit, + CompareValLowerLimit: compareValLowerLimit, + AnchorRealTimeData: anchorRealTimeData, + }, + CalculateFunc: calculateFunc, + CalculateParams: []float64{}, + } + anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID) + if err != nil { + logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err) + continue + } + anchorChan <- anchorConfig + default: + msg, err := consumer.ReadMessage(batchTimeout) + if err != nil { + if err.(kafka.Error).Code() == kafka.ErrTimedOut { + // 超时时处理累积的消息 + if len(messages) > 0 { + processMessageBatch(ctx, messages) + consumer.Commit() + messages = messages[:0] + } + continue + } + logger.Error(ctx, "read message from kafka failed", "error", err) + continue + } + + messages = append(messages, msg) + + // TODO 达到批处理大小或超时时间时处理消息 + if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout { + processMessageBatch(ctx, messages) + consumer.Commit() + messages = messages[:0] + lastCommit = time.Now() + } + } + } +} + +type RealTimeDataPayload struct { + ComponentUUID string + Values []float64 +} + +type RealTimeData struct { + Payload RealTimeDataPayload +} + +func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) { + var realTimeData RealTimeData + err := json.Unmarshal(msgValue, &realTimeData) + if err != nil { + return nil, fmt.Errorf("json unmarshal failed: %w", err) + } + return &realTimeData, nil +} + +func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) { + componentUUID := realTimeData.Payload.ComponentUUID + component, err := diagram.GetComponentMap(componentUUID) + if err != nil { + logger.Error(ctx, "query component info from diagram map by component id failed", + "component_uuid", componentUUID, "error", err) + return + } + + componentType := component.Type + if componentType != constants.DemoType { + logger.Error(ctx, "can not process real time data of component type not equal DemoType", + "component_uuid", componentUUID) + return + } + + var anchorName string + var compareValUpperLimit, compareValLowerLimit float64 + var anchorRealTimeData []float64 + var calculateFunc func(archorValue float64, args ...float64) float64 + + // 收集实时数据 + for _, param := range realTimeData.Payload.Values { + anchorRealTimeData = append(anchorRealTimeData, param) + } + + anchorConfig := config.AnchorParamConfig{ + AnchorParamBaseConfig: config.AnchorParamBaseConfig{ + ComponentUUID: componentUUID, + AnchorName: anchorName, + CompareValUpperLimit: compareValUpperLimit, + CompareValLowerLimit: compareValLowerLimit, + AnchorRealTimeData: anchorRealTimeData, + }, + CalculateFunc: calculateFunc, + CalculateParams: []float64{}, + } + + anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID) + if err != nil { + logger.Error(ctx, "get anchor param chan failed", + "component_uuid", componentUUID, "error", err) + return + } + + // TODO 使用select避免channel阻塞 + select { + case anchorChan <- anchorConfig: + // 成功发送 + case <-ctx.Done(): + logger.Info(ctx, "context done while sending to anchor chan") + case <-time.After(5 * time.Second): + logger.Error(ctx, "timeout sending to anchor chan", "component_uuid", componentUUID) + } +} + +func processMessageBatch(ctx context.Context, messages []*kafka.Message) { + for _, msg := range messages { + realTimeData, err := parseKafkaMessage(msg.Value) + if err != nil { + logger.Error(ctx, "parse kafka message failed", "error", err) + continue + } + processRealTimeData(ctx, realTimeData) + } +} diff --git a/real-time-data/real_time_data_receive.go b/real-time-data/real_time_data_receive.go deleted file mode 100644 index 5d9f1f3..0000000 --- a/real-time-data/real_time_data_receive.go +++ /dev/null @@ -1,73 +0,0 @@ -// Package realtimedata define real time data operation functions -package realtimedata - -import ( - "context" - - "modelRT/config" - "modelRT/constants" - "modelRT/diagram" - "modelRT/logger" - "modelRT/network" - "modelRT/pool" -) - -// RealTimeDataChan define channel of real time data receive -var RealTimeDataChan chan network.RealTimeDataReceiveRequest - -func init() { - RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100) -} - -// ReceiveChan define func of real time data receive and process -func ReceiveChan(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case realTimeData := <-RealTimeDataChan: - componentUUID := realTimeData.PayLoad.ComponentUUID - component, err := diagram.GetComponentMap(componentUUID) - if err != nil { - logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err) - continue - } - - componentType := component.Type - if componentType != constants.DemoType { - logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID) - continue - } - - var anchorName string - var compareValUpperLimit, compareValLowerLimit float64 - var anchorRealTimeData []float64 - var calculateFunc func(archorValue float64, args ...float64) float64 - - // calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData) - - for _, param := range realTimeData.PayLoad.Values { - anchorRealTimeData = append(anchorRealTimeData, param.Value) - } - - anchorConfig := config.AnchorParamConfig{ - AnchorParamBaseConfig: config.AnchorParamBaseConfig{ - ComponentUUID: componentUUID, - AnchorName: anchorName, - CompareValUpperLimit: compareValUpperLimit, - CompareValLowerLimit: compareValLowerLimit, - AnchorRealTimeData: anchorRealTimeData, - }, - CalculateFunc: calculateFunc, - CalculateParams: []float64{}, - } - anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID) - if err != nil { - logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err) - continue - } - anchorChan <- anchorConfig - default: - } - } -} diff --git a/router/monitor.go b/router/monitor.go index 48056ca..1b4fa2c 100644 --- a/router/monitor.go +++ b/router/monitor.go @@ -10,6 +10,6 @@ import ( // registerMonitorRoutes define func of register monitordata routes func registerMonitorRoutes(rg *gin.RouterGroup) { g := rg.Group("/monitors/") - g.POST("/data/subscriptions", handler.RealTimeSubHandler) - g.GET("/data/realtime/stream/:clientID", handler.PullRealTimeDataHandler) + g.POST("data/subscriptions", handler.RealTimeSubHandler) + g.GET("data/realtime/stream/:clientID", handler.PullRealTimeDataHandler) }