modelRT/handler/real_time_data_query.go

194 lines
5.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package handler provides HTTP handlers for various endpoints.
package handler
import (
"context"
"fmt"
"net/http"
"net/url"
"time"
"modelRT/logger"
"modelRT/network"
"github.com/bitly/go-simplejson"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
jsoniter "github.com/json-iterator/go"
)
var wsUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// CheckOrigin 必须返回 true否则浏览器会拒绝连接
CheckOrigin: func(_ *http.Request) bool {
// 在生产环境中,应该更严格地检查 Origin 头部
return true
},
}
// QueryRealTimeDataHandler define query real time data process API
// @Summary 获取实时测点数据
// @Description 根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据
// @Tags RealTime Component
// @Accept json
// @Produce json
// @Param token query string true "测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)"
// @Param begin query int true "查询起始时间 (Unix时间戳, e.g., 1761008266)"
// @Param end query int true "查询结束时间 (Unix时间戳, e.g., 1761526675)"
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeDataPayload} "返回实时数据成功"
//
// @Example 200 {
// "code": 200,
// "msg": "success",
// "payload": {
// "input": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
// "sub_pos": [
// {
// "time": 1736305467506000000,
// "value": 1
// }
// ]
// }
// }
//
// @Failure 400 {object} network.FailureResponse "返回实时数据失败"
//
// @Example 400 {
// "code": 400,
// "msg": "failed to get real time data from dataRT",
// }
//
// @Router /data/realtime [get]
func QueryRealTimeDataHandler(c *gin.Context) {
var request network.RealTimeQueryRequest
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(c, "failed to unmarshal real time query request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
conn, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
return
}
defer conn.Close()
fmt.Println(request)
// start a goroutine to open a websocket service with the dataRT service and use the channel to pass data back. Start and maintain the websocket connection with the front-end UI in the local api
transportChannel := make(chan []any, 100)
closeChannel := make(chan struct{})
for {
select {
case data := <-transportChannel:
respByte, err := jsoniter.Marshal(data)
if err != nil {
logger.Error(c, "marshal real time data to bytes failed", "error", err)
continue
}
err = conn.WriteMessage(websocket.TextMessage, respByte)
if err != nil {
logger.Error(c, "write message to websocket connection failed", "error", err)
continue
}
case <-closeChannel:
logger.Info(c, "data receiving goroutine has been closed")
// TODO 优化时间控制
deadline := time.Now().Add(5 * time.Second)
err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "the session ended normally"), deadline)
if err != nil {
logger.Error(c, "sending close control message failed", "error", err)
}
// gracefully close session processing
err = conn.Close()
if err != nil {
logger.Error(c, "websocket conn closed failed", "error", err)
}
logger.Info(c, "websocket connection closed successfully.")
}
}
}
// receiveRealTimeDataByWebSocket define func of receive real time data by websocket
func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, transportChannel chan []any, closeChannel chan struct{}) {
serverURL := "ws://127.0.0.1:8888/ws/points"
u, err := url.Parse(serverURL)
if err != nil {
logger.Error(ctx, "parse url failed", "error", err)
}
q := u.Query()
for key, values := range params {
for _, value := range values {
q.Add(key, value)
}
}
u.RawQuery = q.Encode()
finalServerURL := u.String()
conn, resp, err := websocket.DefaultDialer.Dial(finalServerURL, nil)
if err != nil {
logger.Error(ctx, "dialing websocket server failed", "error", err)
if resp != nil {
logger.Error(ctx, "websocket server response", "status", resp.Status)
}
return
}
defer conn.Close()
for {
msgType, message, err := conn.ReadMessage()
if err != nil {
// check if it is an expected shutdown error
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
logger.Info(ctx, "connection closed normally")
} else {
logger.Error(ctx, "abnormal disconnection from websocket server", "err", err)
}
close(closeChannel)
break
}
logger.Info(ctx, "received info from dataRT server", "msg_type", messageTypeToString(msgType), "message", string(message))
js, err := simplejson.NewJson(message)
if err != nil {
logger.Error(ctx, "parse real time data from message failed", "message", string(message), "err", err)
continue
}
subPoss, err := js.Get("sub_pos").Array()
if err != nil {
logger.Error(ctx, "parse sub_pos struct from message json info", "sub_pos", js.Get("sub_pos"), "err", err)
continue
}
transportChannel <- subPoss
}
return
}
// messageTypeToString define func of auxiliary to convert message type to string
func messageTypeToString(t int) string {
switch t {
case websocket.TextMessage:
return "TEXT"
case websocket.BinaryMessage:
return "BINARY"
case websocket.PingMessage:
return "PING"
case websocket.PongMessage:
return "PONG"
case websocket.CloseMessage:
return "CLOSE"
default:
return "UNKNOWN"
}
}