From 43dece39c1f50ba82e3f3c35cd43693fff3a93b0 Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 20 Jan 2025 16:20:21 +0800 Subject: [PATCH] add handler of dataRT real time data push --- go.mod | 1 + go.sum | 2 + handler/real_time_data.go | 102 ++++++++++++++++++ main.go | 13 ++- network/real_time_data_request.go | 20 ++++ network/real_time_data_response.go | 9 ++ .../{data_polling.go => anchor_param.go} | 5 +- real-time-data/real_time_data_receive.go | 29 +++++ 8 files changed, 176 insertions(+), 5 deletions(-) create mode 100644 handler/real_time_data.go create mode 100644 network/real_time_data_request.go create mode 100644 network/real_time_data_response.go rename real-time-data/{data_polling.go => anchor_param.go} (64%) create mode 100644 real-time-data/real_time_data_receive.go diff --git a/go.mod b/go.mod index 48f0f7c..e5f2b8c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/confluentinc/confluent-kafka-go v1.9.2 github.com/gin-gonic/gin v1.10.0 github.com/gofrs/uuid v4.4.0+incompatible + github.com/gorilla/websocket v1.5.3 github.com/json-iterator/go v1.1.12 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/panjf2000/ants/v2 v2.10.0 diff --git a/go.sum b/go.sum index be2ecba..da45600 100644 --- a/go.sum +++ b/go.sum @@ -115,6 +115,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= diff --git a/handler/real_time_data.go b/handler/real_time_data.go new file mode 100644 index 0000000..b5468b3 --- /dev/null +++ b/handler/real_time_data.go @@ -0,0 +1,102 @@ +package handler + +import ( + "modelRT/logger" + "modelRT/network" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + jsoniter "github.com/json-iterator/go" + "go.uber.org/zap" + + realtimedata "modelRT/real-time-data" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// RealTimeDataReceivehandler define real time data receive and process API +func RealTimeDataReceivehandler(c *gin.Context) { + logger := logger.GetLoggerInstance() + + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + logger.Error("upgrade http protocol to websocket protocal failed", zap.Error(err)) + return + } + defer conn.Close() + + for { + messageType, p, err := conn.ReadMessage() + if err != nil { + logger.Error("read message from websocket connection failed", zap.Error(err)) + + respByte := processResponse(-1, "read message from websocket connection failed", nil) + if len(respByte) == 0 { + logger.Error("process message from byte failed", zap.Error(err)) + continue + } + + err = conn.WriteMessage(messageType, respByte) + if err != nil { + logger.Error("write message to websocket connection failed", zap.Error(err)) + continue + } + continue + } + + var request network.RealTimeDataReceiveRequest + err = jsoniter.Unmarshal([]byte(p), &request) + if err != nil { + logger.Error("unmarshal message from byte failed", zap.Error(err)) + + respByte := processResponse(-1, "unmarshal message from byte failed", nil) + if len(respByte) == 0 { + logger.Error("process message from byte failed", zap.Error(err)) + continue + } + + err = conn.WriteMessage(messageType, respByte) + if err != nil { + logger.Error("write message to websocket connection failed", zap.Error(err)) + continue + } + continue + } + + realtimedata.RealTimeDataChan <- request + + payload := map[string]interface{}{ + "component_id": request.PayLoad.ComponentID, + "point": request.PayLoad.Point, + } + respByte := processResponse(0, "success", payload) + if len(respByte) == 0 { + logger.Error("process message from byte failed", zap.Error(err)) + continue + } + + err = conn.WriteMessage(messageType, respByte) + if err != nil { + logger.Error("write message to websocket connection failed", zap.Error(err)) + continue + } + } +} + +func processResponse(code int64, msg string, payload map[string]interface{}) []byte { + resp := network.RealTimeDataReceiveResponse{ + Code: code, + Msg: msg, + PayLoad: payload, + } + + respByte, err := jsoniter.Marshal(resp) + if err != nil { + return []byte("") + } + + return respByte +} diff --git a/main.go b/main.go index ad4d9ce..9afd52e 100644 --- a/main.go +++ b/main.go @@ -13,11 +13,12 @@ import ( "modelRT/logger" "modelRT/middleware" "modelRT/pool" - realtimedata "modelRT/real-time-data" swaggerFiles "github.com/swaggo/files" ginSwagger "github.com/swaggo/gin-swagger" + realtimedata "modelRT/real-time-data" + "github.com/gin-gonic/gin" "github.com/panjf2000/ants/v2" "go.uber.org/zap" @@ -79,10 +80,13 @@ func main() { } defer pollingPool.Release() - // init data polling + // init cancel context cancelCtx, cancel := context.WithCancel(ctx) defer cancel() - go realtimedata.DataPolling(cancelCtx, pollingPool) + // init anchor channel + go realtimedata.AnchorParamChangeChan(cancelCtx, pollingPool) + // init real time data receive channel + go realtimedata.ReceiveChan(cancelCtx) postgresDBClient.Transaction(func(tx *gorm.DB) error { // load circuit diagram from postgres @@ -114,6 +118,9 @@ func main() { engine.POST("/model/diagram_update", handler.CircuitDiagramUpdateHandler) engine.POST("/model/diagram_delete", handler.CircuitDiagramDeleteHandler) + // real time data api + engine.GET("/ws/rtdatas", handler.RealTimeDataReceivehandler) + // anchor api engine.POST("/model/anchor_replace", handler.ComponentAnchorReplaceHandler) diff --git a/network/real_time_data_request.go b/network/real_time_data_request.go new file mode 100644 index 0000000..0a8c63a --- /dev/null +++ b/network/real_time_data_request.go @@ -0,0 +1,20 @@ +// Package network define struct of network operation +package network + +// RealTimeDataReceiveRequest defines request params of real time data receive api +type RealTimeDataReceiveRequest struct { + PayLoad RealTimeDataReceivePayload `json:"payload"` +} + +// RealTimeDataReceivePayload defines request payload of real time data receive api +type RealTimeDataReceivePayload struct { + ComponentID int64 `json:"component_id"` + Point string `json:"point"` + Values []RealTimeDataReceiveParam `json:"values"` +} + +// RealTimeDataReceiveParam defines request param of real time data receive api +type RealTimeDataReceiveParam struct { + Time int64 `json:"time"` + Value float64 `json:"value"` +} diff --git a/network/real_time_data_response.go b/network/real_time_data_response.go new file mode 100644 index 0000000..7a3c179 --- /dev/null +++ b/network/real_time_data_response.go @@ -0,0 +1,9 @@ +// Package network define struct of network operation +package network + +// RealTimeDataReceiveResponse defines response params of real time data receive api +type RealTimeDataReceiveResponse struct { + Code int64 `json:"code"` + Msg string `json:"msg"` + PayLoad map[string]interface{} `json:"payload"` +} diff --git a/real-time-data/data_polling.go b/real-time-data/anchor_param.go similarity index 64% rename from real-time-data/data_polling.go rename to real-time-data/anchor_param.go index ee51e73..db149fa 100644 --- a/real-time-data/data_polling.go +++ b/real-time-data/anchor_param.go @@ -9,14 +9,15 @@ import ( "github.com/panjf2000/ants/v2" ) +// AnchorParamsChan define channel of component anchor param change var AnchorParamsChan chan config.AnchorParamConfig func init() { AnchorParamsChan = make(chan config.AnchorParamConfig, 100) } -// DataPolling perform data polling on dataRT modules based on UUID -func DataPolling(ctx context.Context, pool *ants.PoolWithFunc) { +// AnchorParamChangeChan define func of component anchor param change notification process +func AnchorParamChangeChan(ctx context.Context, pool *ants.PoolWithFunc) { for { select { case <-ctx.Done(): diff --git a/real-time-data/real_time_data_receive.go b/real-time-data/real_time_data_receive.go new file mode 100644 index 0000000..28abf78 --- /dev/null +++ b/real-time-data/real_time_data_receive.go @@ -0,0 +1,29 @@ +// Package realtimedata define real time data operation functions +package realtimedata + +import ( + "context" + "fmt" + + "modelRT/network" +) + +// RealTimeDataChan define channel of real time data receive +var RealTimeDataChan chan network.RealTimeDataReceiveRequest + +func init() { + RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100) +} + +// ReceiveChan define func of real time data receive and process +func ReceiveChan(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case realTimeData := <-RealTimeDataChan: + fmt.Println(realTimeData.PayLoad.ComponentID) + default: + } + } +}