optimize real time data query api
This commit is contained in:
parent
09700a86ee
commit
2584f6dacb
|
|
@ -4,20 +4,29 @@ package handler
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"modelRT/alert"
|
||||
"modelRT/constants"
|
||||
"modelRT/logger"
|
||||
"modelRT/network"
|
||||
|
||||
"github.com/bitly/go-simplejson"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
|
||||
var wsUpgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
// CheckOrigin 必须返回 true,否则浏览器会拒绝连接
|
||||
CheckOrigin: func(_ *http.Request) bool {
|
||||
// 在生产环境中,应该更严格地检查 Origin 头部
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
// QueryRealTimeDataHandler define query real time data process API
|
||||
// @Summary 获取实时测点数据
|
||||
// @Description 根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据
|
||||
|
|
@ -52,13 +61,35 @@ import (
|
|||
//
|
||||
// @Router /data/realtime [get]
|
||||
func QueryRealTimeDataHandler(c *gin.Context) {
|
||||
token := c.Query("token")
|
||||
beginStr := c.Query("begin")
|
||||
endStr := c.Query("end")
|
||||
req := c.Request
|
||||
queryParams := req.URL.Query()
|
||||
token := queryParams.Get("token")
|
||||
beginStr := queryParams.Get("begin")
|
||||
endStr := queryParams.Get("end")
|
||||
|
||||
// TODO 启动一个goroutine用来开启与dataRT服务的websocket服务,并使用channel 将数据传递回来,本地api中启动并维持与前端UI的websocket连接
|
||||
var transportChannel chan network.RealTimeDataPoint
|
||||
var closeChannel chan struct{}
|
||||
if token == "" || beginStr == "" || endStr == "" {
|
||||
err := fmt.Errorf("missing required parameters: token=%s, begin=%s, end=%s", token, beginStr, endStr)
|
||||
|
||||
logger.Error(c, "failed to unmarshal measurement recommend request", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// TODO 增加对 token 的解析
|
||||
|
||||
conn, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// start a goroutine to open a websocket service with the dataRT service and use the channel to pass data back. Start and maintain the websocket connection with the front-end UI in the local api
|
||||
transportChannel := make(chan []any, 100)
|
||||
closeChannel := make(chan struct{})
|
||||
|
||||
params := url.Values{}
|
||||
params.Set("token", token)
|
||||
|
|
@ -69,29 +100,37 @@ func QueryRealTimeDataHandler(c *gin.Context) {
|
|||
for {
|
||||
select {
|
||||
case data := <-transportChannel:
|
||||
fmt.Println("receive real time data:", data)
|
||||
respByte, err := jsoniter.Marshal(data)
|
||||
if err != nil {
|
||||
logger.Error(c, "marshal real time data to bytes failed", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = conn.WriteMessage(websocket.TextMessage, respByte)
|
||||
if err != nil {
|
||||
logger.Error(c, "write message to websocket connection failed", "error", err)
|
||||
continue
|
||||
}
|
||||
case <-closeChannel:
|
||||
fmt.Println("websocket connection closed")
|
||||
logger.Info(c, "data receiving goroutine has been closed")
|
||||
// TODO 优化时间控制
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "the session ended normally"), deadline)
|
||||
if err != nil {
|
||||
logger.Error(c, "sending close control message failed", "error", err)
|
||||
}
|
||||
// gracefully close session processing
|
||||
err = conn.Close()
|
||||
if err != nil {
|
||||
logger.Error(c, "websocket conn closed failed", "error", err)
|
||||
}
|
||||
logger.Info(c, "websocket connection closed successfully.")
|
||||
}
|
||||
}
|
||||
|
||||
var level int
|
||||
var targetLevel constants.AlertLevel
|
||||
alertManger := alert.GetAlertMangerInstance()
|
||||
targetLevel = constants.AlertLevel(level)
|
||||
events := alertManger.GetRangeEventsByLevel(targetLevel)
|
||||
|
||||
resp := network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
PayLoad: map[string]interface{}{
|
||||
"events": events,
|
||||
},
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, transportChannel chan network.RealTimeDataPoint, closeChannel chan struct{}) {
|
||||
// receiveRealTimeDataByWebSocket define func of receive real time data by websocket
|
||||
func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, transportChannel chan []any, closeChannel chan struct{}) {
|
||||
serverURL := "ws://127.0.0.1:8888/ws/points"
|
||||
u, err := url.Parse(serverURL)
|
||||
if err != nil {
|
||||
|
|
@ -111,8 +150,7 @@ func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, tran
|
|||
if err != nil {
|
||||
logger.Error(ctx, "dialing websocket server failed", "error", err)
|
||||
if resp != nil {
|
||||
// TODO 优化错误判断
|
||||
log.Printf("HTTP Response Status: %s", resp.Status)
|
||||
logger.Error(ctx, "websocket server response", "status", resp.Status)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
@ -131,28 +169,19 @@ func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, tran
|
|||
break
|
||||
}
|
||||
logger.Info(ctx, "received info from dataRT server", "msg_type", messageTypeToString(msgType), "message", string(message))
|
||||
// parse message by xxx struct
|
||||
var point network.RealTimeDataPoint
|
||||
|
||||
js, err := simplejson.NewJson(message)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse real time data from message failed", "message", string(message), "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
time, err := js.Get("time").Int64()
|
||||
subPoss, err := js.Get("sub_pos").Array()
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse time data from message json info", "time", js.Get("time"), "err", err)
|
||||
logger.Error(ctx, "parse sub_pos struct from message json info", "sub_pos", js.Get("sub_pos"), "err", err)
|
||||
continue
|
||||
}
|
||||
point.Time = time
|
||||
|
||||
value, err := js.Get("value").Float64()
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse value data from message json info", "value", js.Get("value"), "err", err)
|
||||
continue
|
||||
}
|
||||
point.Value = value
|
||||
transportChannel <- point
|
||||
transportChannel <- subPoss
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue