diff --git a/handler/real_time_data_query.go b/handler/real_time_data_query.go index 661f95c..f0a4610 100644 --- a/handler/real_time_data_query.go +++ b/handler/real_time_data_query.go @@ -4,20 +4,29 @@ package handler import ( "context" "fmt" - "log" "net/http" "net/url" + "time" - "modelRT/alert" - "modelRT/constants" "modelRT/logger" "modelRT/network" "github.com/bitly/go-simplejson" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" + jsoniter "github.com/json-iterator/go" ) +var wsUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // CheckOrigin 必须返回 true,否则浏览器会拒绝连接 + CheckOrigin: func(_ *http.Request) bool { + // 在生产环境中,应该更严格地检查 Origin 头部 + return true + }, +} + // QueryRealTimeDataHandler define query real time data process API // @Summary 获取实时测点数据 // @Description 根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据 @@ -52,13 +61,35 @@ import ( // // @Router /data/realtime [get] func QueryRealTimeDataHandler(c *gin.Context) { - token := c.Query("token") - beginStr := c.Query("begin") - endStr := c.Query("end") + req := c.Request + queryParams := req.URL.Query() + token := queryParams.Get("token") + beginStr := queryParams.Get("begin") + endStr := queryParams.Get("end") - // TODO 启动一个goroutine用来开启与dataRT服务的websocket服务,并使用channel 将数据传递回来,本地api中启动并维持与前端UI的websocket连接 - var transportChannel chan network.RealTimeDataPoint - var closeChannel chan struct{} + if token == "" || beginStr == "" || endStr == "" { + err := fmt.Errorf("missing required parameters: token=%s, begin=%s, end=%s", token, beginStr, endStr) + + logger.Error(c, "failed to unmarshal measurement recommend request", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + }) + return + } + + // TODO 增加对 token 的解析 + + conn, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err) + return + } + defer conn.Close() + + // start a goroutine to open a websocket service with the dataRT service and use the channel to pass data back. Start and maintain the websocket connection with the front-end UI in the local api + transportChannel := make(chan []any, 100) + closeChannel := make(chan struct{}) params := url.Values{} params.Set("token", token) @@ -69,29 +100,37 @@ func QueryRealTimeDataHandler(c *gin.Context) { for { select { case data := <-transportChannel: - fmt.Println("receive real time data:", data) + respByte, err := jsoniter.Marshal(data) + if err != nil { + logger.Error(c, "marshal real time data to bytes failed", "error", err) + continue + } + + err = conn.WriteMessage(websocket.TextMessage, respByte) + if err != nil { + logger.Error(c, "write message to websocket connection failed", "error", err) + continue + } case <-closeChannel: - fmt.Println("websocket connection closed") + logger.Info(c, "data receiving goroutine has been closed") + // TODO 优化时间控制 + deadline := time.Now().Add(5 * time.Second) + err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "the session ended normally"), deadline) + if err != nil { + logger.Error(c, "sending close control message failed", "error", err) + } + // gracefully close session processing + err = conn.Close() + if err != nil { + logger.Error(c, "websocket conn closed failed", "error", err) + } + logger.Info(c, "websocket connection closed successfully.") } } - - var level int - var targetLevel constants.AlertLevel - alertManger := alert.GetAlertMangerInstance() - targetLevel = constants.AlertLevel(level) - events := alertManger.GetRangeEventsByLevel(targetLevel) - - resp := network.SuccessResponse{ - Code: http.StatusOK, - Msg: "success", - PayLoad: map[string]interface{}{ - "events": events, - }, - } - c.JSON(http.StatusOK, resp) } -func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, transportChannel chan network.RealTimeDataPoint, closeChannel chan struct{}) { +// receiveRealTimeDataByWebSocket define func of receive real time data by websocket +func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, transportChannel chan []any, closeChannel chan struct{}) { serverURL := "ws://127.0.0.1:8888/ws/points" u, err := url.Parse(serverURL) if err != nil { @@ -111,8 +150,7 @@ func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, tran if err != nil { logger.Error(ctx, "dialing websocket server failed", "error", err) if resp != nil { - // TODO 优化错误判断 - log.Printf("HTTP Response Status: %s", resp.Status) + logger.Error(ctx, "websocket server response", "status", resp.Status) } return } @@ -131,28 +169,19 @@ func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, tran break } logger.Info(ctx, "received info from dataRT server", "msg_type", messageTypeToString(msgType), "message", string(message)) - // parse message by xxx struct - var point network.RealTimeDataPoint + js, err := simplejson.NewJson(message) if err != nil { logger.Error(ctx, "parse real time data from message failed", "message", string(message), "err", err) continue } - time, err := js.Get("time").Int64() + subPoss, err := js.Get("sub_pos").Array() if err != nil { - logger.Error(ctx, "parse time data from message json info", "time", js.Get("time"), "err", err) + logger.Error(ctx, "parse sub_pos struct from message json info", "sub_pos", js.Get("sub_pos"), "err", err) continue } - point.Time = time - - value, err := js.Get("value").Float64() - if err != nil { - logger.Error(ctx, "parse value data from message json info", "value", js.Get("value"), "err", err) - continue - } - point.Value = value - transportChannel <- point + transportChannel <- subPoss } return }