diff --git a/docs/docs.go b/docs/docs.go index 5e2be9a..ab3d020 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -23,6 +23,70 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { + "/data/realtime": { + "get": { + "description": "根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "RealTime Component" + ], + "summary": "获取实时测点数据", + "parameters": [ + { + "type": "string", + "description": "测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)", + "name": "token", + "in": "query", + "required": true + }, + { + "type": "integer", + "description": "查询起始时间 (Unix时间戳, e.g., 1761008266)", + "name": "begin", + "in": "query", + "required": true + }, + { + "type": "integer", + "description": "查询结束时间 (Unix时间戳, e.g., 1761526675)", + "name": "end", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "返回实时数据成功", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.RealTimeDataPayload" + } + } + } + ] + } + }, + "400": { + "description": "返回实时数据失败", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } + }, "/measurement/recommend": { "get": { "description": "根据用户输入的字符串,从 Redis 中查询可能的测量点或结构路径,并提供推荐列表。", @@ -164,6 +228,15 @@ const docTemplate = `{ } } }, + "network.RealTimeDataPayload": { + "type": "object", + "properties": { + "sub_pos": { + "description": "TODO 增加example tag", + "type": "object" + } + } + }, "network.SuccessResponse": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index d638ab4..92f20fa 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -17,6 +17,70 @@ "host": "localhost:8080", "basePath": "/api/v1", "paths": { + "/data/realtime": { + "get": { + "description": "根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "RealTime Component" + ], + "summary": "获取实时测点数据", + "parameters": [ + { + "type": "string", + "description": "测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)", + "name": "token", + "in": "query", + "required": true + }, + { + "type": "integer", + "description": "查询起始时间 (Unix时间戳, e.g., 1761008266)", + "name": "begin", + "in": "query", + "required": true + }, + { + "type": "integer", + "description": "查询结束时间 (Unix时间戳, e.g., 1761526675)", + "name": "end", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "返回实时数据成功", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.RealTimeDataPayload" + } + } + } + ] + } + }, + "400": { + "description": "返回实时数据失败", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } + }, "/measurement/recommend": { "get": { "description": "根据用户输入的字符串,从 Redis 中查询可能的测量点或结构路径,并提供推荐列表。", @@ -158,6 +222,15 @@ } } }, + "network.RealTimeDataPayload": { + "type": "object", + "properties": { + "sub_pos": { + "description": "TODO 增加example tag", + "type": "object" + } + } + }, "network.SuccessResponse": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index a9bd578..e540f45 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -34,6 +34,12 @@ definitions: example: trans type: string type: object + network.RealTimeDataPayload: + properties: + sub_pos: + description: TODO 增加example tag + type: object + type: object network.SuccessResponse: properties: code: @@ -58,6 +64,46 @@ info: title: ModelRT 实时模型服务 API 文档 version: "1.0" paths: + /data/realtime: + get: + consumes: + - application/json + description: 根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据 + parameters: + - description: 测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms) + in: query + name: token + required: true + type: string + - description: 查询起始时间 (Unix时间戳, e.g., 1761008266) + in: query + name: begin + required: true + type: integer + - description: 查询结束时间 (Unix时间戳, e.g., 1761526675) + in: query + name: end + required: true + type: integer + produces: + - application/json + responses: + "200": + description: 返回实时数据成功 + schema: + allOf: + - $ref: '#/definitions/network.SuccessResponse' + - properties: + payload: + $ref: '#/definitions/network.RealTimeDataPayload' + type: object + "400": + description: 返回实时数据失败 + schema: + $ref: '#/definitions/network.FailureResponse' + summary: 获取实时测点数据 + tags: + - RealTime Component /measurement/recommend: get: consumes: diff --git a/handler/measurement_recommend.go b/handler/measurement_recommend.go index debafa6..6e5a31e 100644 --- a/handler/measurement_recommend.go +++ b/handler/measurement_recommend.go @@ -35,10 +35,12 @@ import ( // } // // @Failure 400 {object} network.FailureResponse "返回推荐列表失败" -// @Example 400 { -// "code": 400, -// "msg": "failed to get recommend data from redis", -// } +// +// @Example 400 { +// "code": 400, +// "msg": "failed to get recommend data from redis", +// } +// // @Router /measurement/recommend [get] func MeasurementRecommendHandler(c *gin.Context) { var request network.MeasurementRecommendRequest @@ -102,11 +104,6 @@ func MeasurementRecommendHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - // PayLoad: map[string]any{ - // "input": request.Input, - // "offset": finalOffset, - // "recommended_list": resultRecommends, - // }, PayLoad: &network.MeasurementRecommendPayload{ Input: request.Input, Offset: finalOffset, diff --git a/handler/real_time_data_query.go b/handler/real_time_data_query.go index 0a4c084..661f95c 100644 --- a/handler/real_time_data_query.go +++ b/handler/real_time_data_query.go @@ -2,46 +2,79 @@ package handler import ( + "context" "fmt" + "log" "net/http" - "strconv" + "net/url" "modelRT/alert" "modelRT/constants" "modelRT/logger" "modelRT/network" + "github.com/bitly/go-simplejson" "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" ) // QueryRealTimeDataHandler define query real time data process API +// @Summary 获取实时测点数据 +// @Description 根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据 +// @Tags RealTime Component +// @Accept json +// @Produce json +// @Param token query string true "测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)" +// @Param begin query int true "查询起始时间 (Unix时间戳, e.g., 1761008266)" +// @Param end query int true "查询结束时间 (Unix时间戳, e.g., 1761526675)" +// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeDataPayload} "返回实时数据成功" +// +// @Example 200 { +// "code": 200, +// "msg": "success", +// "payload": { +// "input": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", +// "sub_pos": [ +// { +// "time": 1736305467506000000, +// "value": 1 +// } +// ] +// } +// } +// +// @Failure 400 {object} network.FailureResponse "返回实时数据失败" +// +// @Example 400 { +// "code": 400, +// "msg": "failed to get real time data from dataRT", +// } +// +// @Router /data/realtime [get] func QueryRealTimeDataHandler(c *gin.Context) { token := c.Query("token") beginStr := c.Query("begin") - begin, err := strconv.Atoi(beginStr) - if err != nil { - logger.Error(c, "convert begin param from string to int failed", "error", err) - - resp := network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: err.Error(), - } - c.JSON(http.StatusOK, resp) - } endStr := c.Query("end") - end, err := strconv.Atoi(endStr) - if err != nil { - logger.Error(c, "convert end param from string to int failed", "error", err) - resp := network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: err.Error(), + // TODO 启动一个goroutine用来开启与dataRT服务的websocket服务,并使用channel 将数据传递回来,本地api中启动并维持与前端UI的websocket连接 + var transportChannel chan network.RealTimeDataPoint + var closeChannel chan struct{} + + params := url.Values{} + params.Set("token", token) + params.Set("begin", beginStr) + params.Set("end", endStr) + go receiveRealTimeDataByWebSocket(c, params, transportChannel, closeChannel) + + for { + select { + case data := <-transportChannel: + fmt.Println("receive real time data:", data) + case <-closeChannel: + fmt.Println("websocket connection closed") } - c.JSON(http.StatusOK, resp) } - fmt.Println(token, begin, end) - // TODO parse token to dataRT query params var level int var targetLevel constants.AlertLevel alertManger := alert.GetAlertMangerInstance() @@ -57,3 +90,87 @@ func QueryRealTimeDataHandler(c *gin.Context) { } c.JSON(http.StatusOK, resp) } + +func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, transportChannel chan network.RealTimeDataPoint, closeChannel chan struct{}) { + serverURL := "ws://127.0.0.1:8888/ws/points" + u, err := url.Parse(serverURL) + if err != nil { + logger.Error(ctx, "parse url failed", "error", err) + } + + q := u.Query() + for key, values := range params { + for _, value := range values { + q.Add(key, value) + } + } + u.RawQuery = q.Encode() + finalServerURL := u.String() + + conn, resp, err := websocket.DefaultDialer.Dial(finalServerURL, nil) + if err != nil { + logger.Error(ctx, "dialing websocket server failed", "error", err) + if resp != nil { + // TODO 优化错误判断 + log.Printf("HTTP Response Status: %s", resp.Status) + } + return + } + defer conn.Close() + + for { + msgType, message, err := conn.ReadMessage() + if err != nil { + // check if it is an expected shutdown error + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + logger.Info(ctx, "connection closed normally") + } else { + logger.Error(ctx, "abnormal disconnection from websocket server", "err", err) + } + close(closeChannel) + break + } + logger.Info(ctx, "received info from dataRT server", "msg_type", messageTypeToString(msgType), "message", string(message)) + // parse message by xxx struct + var point network.RealTimeDataPoint + js, err := simplejson.NewJson(message) + if err != nil { + logger.Error(ctx, "parse real time data from message failed", "message", string(message), "err", err) + continue + } + + time, err := js.Get("time").Int64() + if err != nil { + logger.Error(ctx, "parse time data from message json info", "time", js.Get("time"), "err", err) + continue + } + point.Time = time + + value, err := js.Get("value").Float64() + if err != nil { + logger.Error(ctx, "parse value data from message json info", "value", js.Get("value"), "err", err) + continue + } + point.Value = value + transportChannel <- point + } + return +} + +// messageTypeToString define func of auxiliary to convert message type to string +func messageTypeToString(t int) string { + switch t { + case websocket.TextMessage: + return "TEXT" + case websocket.BinaryMessage: + return "BINARY" + case websocket.PingMessage: + return "PING" + case websocket.PongMessage: + return "PONG" + case websocket.CloseMessage: + return "CLOSE" + default: + return "UNKNOWN" + } +} diff --git a/network/real_time_data_request.go b/network/real_time_data_request.go index 497ac1b..1e2ead4 100644 --- a/network/real_time_data_request.go +++ b/network/real_time_data_request.go @@ -8,13 +8,19 @@ type RealTimeDataReceiveRequest struct { // RealTimeDataReceivePayload defines request payload of real time data receive api type RealTimeDataReceivePayload struct { - ComponentUUID string `json:"component_uuid"` - Point string `json:"point"` - Values []RealTimeDataReceiveParam `json:"values"` + ComponentUUID string `json:"component_uuid"` + Point string `json:"point"` + Values []RealTimeDataPoint `json:"values"` } -// RealTimeDataReceiveParam defines request param of real time data receive api -type RealTimeDataReceiveParam struct { - Time int64 `json:"time"` - Value float64 `json:"value"` +// RealTimeDataPoint define struct of real time data point +type RealTimeDataPoint struct { + Time int64 `json:"time" example:"1678886400"` + Value float64 `json:"value" example:"123.1"` +} + +// RealTimeDataPayload define struct of real time data payload +type RealTimeDataPayload struct { + // TODO 增加example tag + RealTimeDataPoints []RealTimeDataPoint `json:"sub_pos" swaggertype:"object"` }