add handler of dataRT real time data push

This commit is contained in:
douxu 2025-01-20 16:20:21 +08:00
parent 0520e9cece
commit 43dece39c1
8 changed files with 176 additions and 5 deletions

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/confluentinc/confluent-kafka-go v1.9.2 github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/gin-gonic/gin v1.10.0 github.com/gin-gonic/gin v1.10.0
github.com/gofrs/uuid v4.4.0+incompatible github.com/gofrs/uuid v4.4.0+incompatible
github.com/gorilla/websocket v1.5.3
github.com/json-iterator/go v1.1.12 github.com/json-iterator/go v1.1.12
github.com/natefinch/lumberjack v2.0.0+incompatible github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/panjf2000/ants/v2 v2.10.0 github.com/panjf2000/ants/v2 v2.10.0

2
go.sum
View File

@ -115,6 +115,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8= github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=

102
handler/real_time_data.go Normal file
View File

@ -0,0 +1,102 @@
package handler
import (
"modelRT/logger"
"modelRT/network"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
realtimedata "modelRT/real-time-data"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// RealTimeDataReceivehandler define real time data receive and process API
func RealTimeDataReceivehandler(c *gin.Context) {
logger := logger.GetLoggerInstance()
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error("upgrade http protocol to websocket protocal failed", zap.Error(err))
return
}
defer conn.Close()
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
logger.Error("read message from websocket connection failed", zap.Error(err))
respByte := processResponse(-1, "read message from websocket connection failed", nil)
if len(respByte) == 0 {
logger.Error("process message from byte failed", zap.Error(err))
continue
}
err = conn.WriteMessage(messageType, respByte)
if err != nil {
logger.Error("write message to websocket connection failed", zap.Error(err))
continue
}
continue
}
var request network.RealTimeDataReceiveRequest
err = jsoniter.Unmarshal([]byte(p), &request)
if err != nil {
logger.Error("unmarshal message from byte failed", zap.Error(err))
respByte := processResponse(-1, "unmarshal message from byte failed", nil)
if len(respByte) == 0 {
logger.Error("process message from byte failed", zap.Error(err))
continue
}
err = conn.WriteMessage(messageType, respByte)
if err != nil {
logger.Error("write message to websocket connection failed", zap.Error(err))
continue
}
continue
}
realtimedata.RealTimeDataChan <- request
payload := map[string]interface{}{
"component_id": request.PayLoad.ComponentID,
"point": request.PayLoad.Point,
}
respByte := processResponse(0, "success", payload)
if len(respByte) == 0 {
logger.Error("process message from byte failed", zap.Error(err))
continue
}
err = conn.WriteMessage(messageType, respByte)
if err != nil {
logger.Error("write message to websocket connection failed", zap.Error(err))
continue
}
}
}
func processResponse(code int64, msg string, payload map[string]interface{}) []byte {
resp := network.RealTimeDataReceiveResponse{
Code: code,
Msg: msg,
PayLoad: payload,
}
respByte, err := jsoniter.Marshal(resp)
if err != nil {
return []byte("")
}
return respByte
}

13
main.go
View File

@ -13,11 +13,12 @@ import (
"modelRT/logger" "modelRT/logger"
"modelRT/middleware" "modelRT/middleware"
"modelRT/pool" "modelRT/pool"
realtimedata "modelRT/real-time-data"
swaggerFiles "github.com/swaggo/files" swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger" ginSwagger "github.com/swaggo/gin-swagger"
realtimedata "modelRT/real-time-data"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
@ -79,10 +80,13 @@ func main() {
} }
defer pollingPool.Release() defer pollingPool.Release()
// init data polling // init cancel context
cancelCtx, cancel := context.WithCancel(ctx) cancelCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
go realtimedata.DataPolling(cancelCtx, pollingPool) // init anchor channel
go realtimedata.AnchorParamChangeChan(cancelCtx, pollingPool)
// init real time data receive channel
go realtimedata.ReceiveChan(cancelCtx)
postgresDBClient.Transaction(func(tx *gorm.DB) error { postgresDBClient.Transaction(func(tx *gorm.DB) error {
// load circuit diagram from postgres // load circuit diagram from postgres
@ -114,6 +118,9 @@ func main() {
engine.POST("/model/diagram_update", handler.CircuitDiagramUpdateHandler) engine.POST("/model/diagram_update", handler.CircuitDiagramUpdateHandler)
engine.POST("/model/diagram_delete", handler.CircuitDiagramDeleteHandler) engine.POST("/model/diagram_delete", handler.CircuitDiagramDeleteHandler)
// real time data api
engine.GET("/ws/rtdatas", handler.RealTimeDataReceivehandler)
// anchor api // anchor api
engine.POST("/model/anchor_replace", handler.ComponentAnchorReplaceHandler) engine.POST("/model/anchor_replace", handler.ComponentAnchorReplaceHandler)

View File

@ -0,0 +1,20 @@
// Package network define struct of network operation
package network
// RealTimeDataReceiveRequest defines request params of real time data receive api
type RealTimeDataReceiveRequest struct {
PayLoad RealTimeDataReceivePayload `json:"payload"`
}
// RealTimeDataReceivePayload defines request payload of real time data receive api
type RealTimeDataReceivePayload struct {
ComponentID int64 `json:"component_id"`
Point string `json:"point"`
Values []RealTimeDataReceiveParam `json:"values"`
}
// RealTimeDataReceiveParam defines request param of real time data receive api
type RealTimeDataReceiveParam struct {
Time int64 `json:"time"`
Value float64 `json:"value"`
}

View File

@ -0,0 +1,9 @@
// Package network define struct of network operation
package network
// RealTimeDataReceiveResponse defines response params of real time data receive api
type RealTimeDataReceiveResponse struct {
Code int64 `json:"code"`
Msg string `json:"msg"`
PayLoad map[string]interface{} `json:"payload"`
}

View File

@ -9,14 +9,15 @@ import (
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
) )
// AnchorParamsChan define channel of component anchor param change
var AnchorParamsChan chan config.AnchorParamConfig var AnchorParamsChan chan config.AnchorParamConfig
func init() { func init() {
AnchorParamsChan = make(chan config.AnchorParamConfig, 100) AnchorParamsChan = make(chan config.AnchorParamConfig, 100)
} }
// DataPolling perform data polling on dataRT modules based on UUID // AnchorParamChangeChan define func of component anchor param change notification process
func DataPolling(ctx context.Context, pool *ants.PoolWithFunc) { func AnchorParamChangeChan(ctx context.Context, pool *ants.PoolWithFunc) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -0,0 +1,29 @@
// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"fmt"
"modelRT/network"
)
// RealTimeDataChan define channel of real time data receive
var RealTimeDataChan chan network.RealTimeDataReceiveRequest
func init() {
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
}
// ReceiveChan define func of real time data receive and process
func ReceiveChan(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case realTimeData := <-RealTimeDataChan:
fmt.Println(realTimeData.PayLoad.ComponentID)
default:
}
}
}