optimize real time data receive api
This commit is contained in:
parent
954203b84d
commit
09700a86ee
73
docs/docs.go
73
docs/docs.go
|
|
@ -23,6 +23,70 @@ const docTemplate = `{
|
|||
"host": "{{.Host}}",
|
||||
"basePath": "{{.BasePath}}",
|
||||
"paths": {
|
||||
"/data/realtime": {
|
||||
"get": {
|
||||
"description": "根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"RealTime Component"
|
||||
],
|
||||
"summary": "获取实时测点数据",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)",
|
||||
"name": "token",
|
||||
"in": "query",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "integer",
|
||||
"description": "查询起始时间 (Unix时间戳, e.g., 1761008266)",
|
||||
"name": "begin",
|
||||
"in": "query",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "integer",
|
||||
"description": "查询结束时间 (Unix时间戳, e.g., 1761526675)",
|
||||
"name": "end",
|
||||
"in": "query",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "返回实时数据成功",
|
||||
"schema": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/network.SuccessResponse"
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"payload": {
|
||||
"$ref": "#/definitions/network.RealTimeDataPayload"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "返回实时数据失败",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/network.FailureResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/measurement/recommend": {
|
||||
"get": {
|
||||
"description": "根据用户输入的字符串,从 Redis 中查询可能的测量点或结构路径,并提供推荐列表。",
|
||||
|
|
@ -164,6 +228,15 @@ const docTemplate = `{
|
|||
}
|
||||
}
|
||||
},
|
||||
"network.RealTimeDataPayload": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sub_pos": {
|
||||
"description": "TODO 增加example tag",
|
||||
"type": "object"
|
||||
}
|
||||
}
|
||||
},
|
||||
"network.SuccessResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,70 @@
|
|||
"host": "localhost:8080",
|
||||
"basePath": "/api/v1",
|
||||
"paths": {
|
||||
"/data/realtime": {
|
||||
"get": {
|
||||
"description": "根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"RealTime Component"
|
||||
],
|
||||
"summary": "获取实时测点数据",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)",
|
||||
"name": "token",
|
||||
"in": "query",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "integer",
|
||||
"description": "查询起始时间 (Unix时间戳, e.g., 1761008266)",
|
||||
"name": "begin",
|
||||
"in": "query",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "integer",
|
||||
"description": "查询结束时间 (Unix时间戳, e.g., 1761526675)",
|
||||
"name": "end",
|
||||
"in": "query",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "返回实时数据成功",
|
||||
"schema": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/network.SuccessResponse"
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"payload": {
|
||||
"$ref": "#/definitions/network.RealTimeDataPayload"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "返回实时数据失败",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/network.FailureResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/measurement/recommend": {
|
||||
"get": {
|
||||
"description": "根据用户输入的字符串,从 Redis 中查询可能的测量点或结构路径,并提供推荐列表。",
|
||||
|
|
@ -158,6 +222,15 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"network.RealTimeDataPayload": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"sub_pos": {
|
||||
"description": "TODO 增加example tag",
|
||||
"type": "object"
|
||||
}
|
||||
}
|
||||
},
|
||||
"network.SuccessResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
|
|
|||
|
|
@ -34,6 +34,12 @@ definitions:
|
|||
example: trans
|
||||
type: string
|
||||
type: object
|
||||
network.RealTimeDataPayload:
|
||||
properties:
|
||||
sub_pos:
|
||||
description: TODO 增加example tag
|
||||
type: object
|
||||
type: object
|
||||
network.SuccessResponse:
|
||||
properties:
|
||||
code:
|
||||
|
|
@ -58,6 +64,46 @@ info:
|
|||
title: ModelRT 实时模型服务 API 文档
|
||||
version: "1.0"
|
||||
paths:
|
||||
/data/realtime:
|
||||
get:
|
||||
consumes:
|
||||
- application/json
|
||||
description: 根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据
|
||||
parameters:
|
||||
- description: 测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)
|
||||
in: query
|
||||
name: token
|
||||
required: true
|
||||
type: string
|
||||
- description: 查询起始时间 (Unix时间戳, e.g., 1761008266)
|
||||
in: query
|
||||
name: begin
|
||||
required: true
|
||||
type: integer
|
||||
- description: 查询结束时间 (Unix时间戳, e.g., 1761526675)
|
||||
in: query
|
||||
name: end
|
||||
required: true
|
||||
type: integer
|
||||
produces:
|
||||
- application/json
|
||||
responses:
|
||||
"200":
|
||||
description: 返回实时数据成功
|
||||
schema:
|
||||
allOf:
|
||||
- $ref: '#/definitions/network.SuccessResponse'
|
||||
- properties:
|
||||
payload:
|
||||
$ref: '#/definitions/network.RealTimeDataPayload'
|
||||
type: object
|
||||
"400":
|
||||
description: 返回实时数据失败
|
||||
schema:
|
||||
$ref: '#/definitions/network.FailureResponse'
|
||||
summary: 获取实时测点数据
|
||||
tags:
|
||||
- RealTime Component
|
||||
/measurement/recommend:
|
||||
get:
|
||||
consumes:
|
||||
|
|
|
|||
|
|
@ -35,10 +35,12 @@ import (
|
|||
// }
|
||||
//
|
||||
// @Failure 400 {object} network.FailureResponse "返回推荐列表失败"
|
||||
// @Example 400 {
|
||||
// "code": 400,
|
||||
// "msg": "failed to get recommend data from redis",
|
||||
// }
|
||||
//
|
||||
// @Example 400 {
|
||||
// "code": 400,
|
||||
// "msg": "failed to get recommend data from redis",
|
||||
// }
|
||||
//
|
||||
// @Router /measurement/recommend [get]
|
||||
func MeasurementRecommendHandler(c *gin.Context) {
|
||||
var request network.MeasurementRecommendRequest
|
||||
|
|
@ -102,11 +104,6 @@ func MeasurementRecommendHandler(c *gin.Context) {
|
|||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "success",
|
||||
// PayLoad: map[string]any{
|
||||
// "input": request.Input,
|
||||
// "offset": finalOffset,
|
||||
// "recommended_list": resultRecommends,
|
||||
// },
|
||||
PayLoad: &network.MeasurementRecommendPayload{
|
||||
Input: request.Input,
|
||||
Offset: finalOffset,
|
||||
|
|
|
|||
|
|
@ -2,46 +2,79 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"net/url"
|
||||
|
||||
"modelRT/alert"
|
||||
"modelRT/constants"
|
||||
"modelRT/logger"
|
||||
"modelRT/network"
|
||||
|
||||
"github.com/bitly/go-simplejson"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// QueryRealTimeDataHandler define query real time data process API
|
||||
// @Summary 获取实时测点数据
|
||||
// @Description 根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据
|
||||
// @Tags RealTime Component
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param token query string true "测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)"
|
||||
// @Param begin query int true "查询起始时间 (Unix时间戳, e.g., 1761008266)"
|
||||
// @Param end query int true "查询结束时间 (Unix时间戳, e.g., 1761526675)"
|
||||
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeDataPayload} "返回实时数据成功"
|
||||
//
|
||||
// @Example 200 {
|
||||
// "code": 200,
|
||||
// "msg": "success",
|
||||
// "payload": {
|
||||
// "input": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
|
||||
// "sub_pos": [
|
||||
// {
|
||||
// "time": 1736305467506000000,
|
||||
// "value": 1
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Failure 400 {object} network.FailureResponse "返回实时数据失败"
|
||||
//
|
||||
// @Example 400 {
|
||||
// "code": 400,
|
||||
// "msg": "failed to get real time data from dataRT",
|
||||
// }
|
||||
//
|
||||
// @Router /data/realtime [get]
|
||||
func QueryRealTimeDataHandler(c *gin.Context) {
|
||||
token := c.Query("token")
|
||||
beginStr := c.Query("begin")
|
||||
begin, err := strconv.Atoi(beginStr)
|
||||
if err != nil {
|
||||
logger.Error(c, "convert begin param from string to int failed", "error", err)
|
||||
|
||||
resp := network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
endStr := c.Query("end")
|
||||
end, err := strconv.Atoi(endStr)
|
||||
if err != nil {
|
||||
logger.Error(c, "convert end param from string to int failed", "error", err)
|
||||
|
||||
resp := network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
// TODO 启动一个goroutine用来开启与dataRT服务的websocket服务,并使用channel 将数据传递回来,本地api中启动并维持与前端UI的websocket连接
|
||||
var transportChannel chan network.RealTimeDataPoint
|
||||
var closeChannel chan struct{}
|
||||
|
||||
params := url.Values{}
|
||||
params.Set("token", token)
|
||||
params.Set("begin", beginStr)
|
||||
params.Set("end", endStr)
|
||||
go receiveRealTimeDataByWebSocket(c, params, transportChannel, closeChannel)
|
||||
|
||||
for {
|
||||
select {
|
||||
case data := <-transportChannel:
|
||||
fmt.Println("receive real time data:", data)
|
||||
case <-closeChannel:
|
||||
fmt.Println("websocket connection closed")
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
fmt.Println(token, begin, end)
|
||||
// TODO parse token to dataRT query params
|
||||
var level int
|
||||
var targetLevel constants.AlertLevel
|
||||
alertManger := alert.GetAlertMangerInstance()
|
||||
|
|
@ -57,3 +90,87 @@ func QueryRealTimeDataHandler(c *gin.Context) {
|
|||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, transportChannel chan network.RealTimeDataPoint, closeChannel chan struct{}) {
|
||||
serverURL := "ws://127.0.0.1:8888/ws/points"
|
||||
u, err := url.Parse(serverURL)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse url failed", "error", err)
|
||||
}
|
||||
|
||||
q := u.Query()
|
||||
for key, values := range params {
|
||||
for _, value := range values {
|
||||
q.Add(key, value)
|
||||
}
|
||||
}
|
||||
u.RawQuery = q.Encode()
|
||||
finalServerURL := u.String()
|
||||
|
||||
conn, resp, err := websocket.DefaultDialer.Dial(finalServerURL, nil)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "dialing websocket server failed", "error", err)
|
||||
if resp != nil {
|
||||
// TODO 优化错误判断
|
||||
log.Printf("HTTP Response Status: %s", resp.Status)
|
||||
}
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
for {
|
||||
msgType, message, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
// check if it is an expected shutdown error
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
||||
logger.Info(ctx, "connection closed normally")
|
||||
} else {
|
||||
logger.Error(ctx, "abnormal disconnection from websocket server", "err", err)
|
||||
}
|
||||
close(closeChannel)
|
||||
break
|
||||
}
|
||||
logger.Info(ctx, "received info from dataRT server", "msg_type", messageTypeToString(msgType), "message", string(message))
|
||||
// parse message by xxx struct
|
||||
var point network.RealTimeDataPoint
|
||||
js, err := simplejson.NewJson(message)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse real time data from message failed", "message", string(message), "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
time, err := js.Get("time").Int64()
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse time data from message json info", "time", js.Get("time"), "err", err)
|
||||
continue
|
||||
}
|
||||
point.Time = time
|
||||
|
||||
value, err := js.Get("value").Float64()
|
||||
if err != nil {
|
||||
logger.Error(ctx, "parse value data from message json info", "value", js.Get("value"), "err", err)
|
||||
continue
|
||||
}
|
||||
point.Value = value
|
||||
transportChannel <- point
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// messageTypeToString define func of auxiliary to convert message type to string
|
||||
func messageTypeToString(t int) string {
|
||||
switch t {
|
||||
case websocket.TextMessage:
|
||||
return "TEXT"
|
||||
case websocket.BinaryMessage:
|
||||
return "BINARY"
|
||||
case websocket.PingMessage:
|
||||
return "PING"
|
||||
case websocket.PongMessage:
|
||||
return "PONG"
|
||||
case websocket.CloseMessage:
|
||||
return "CLOSE"
|
||||
default:
|
||||
return "UNKNOWN"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,13 +8,19 @@ type RealTimeDataReceiveRequest struct {
|
|||
|
||||
// RealTimeDataReceivePayload defines request payload of real time data receive api
|
||||
type RealTimeDataReceivePayload struct {
|
||||
ComponentUUID string `json:"component_uuid"`
|
||||
Point string `json:"point"`
|
||||
Values []RealTimeDataReceiveParam `json:"values"`
|
||||
ComponentUUID string `json:"component_uuid"`
|
||||
Point string `json:"point"`
|
||||
Values []RealTimeDataPoint `json:"values"`
|
||||
}
|
||||
|
||||
// RealTimeDataReceiveParam defines request param of real time data receive api
|
||||
type RealTimeDataReceiveParam struct {
|
||||
Time int64 `json:"time"`
|
||||
Value float64 `json:"value"`
|
||||
// RealTimeDataPoint define struct of real time data point
|
||||
type RealTimeDataPoint struct {
|
||||
Time int64 `json:"time" example:"1678886400"`
|
||||
Value float64 `json:"value" example:"123.1"`
|
||||
}
|
||||
|
||||
// RealTimeDataPayload define struct of real time data payload
|
||||
type RealTimeDataPayload struct {
|
||||
// TODO 增加example tag
|
||||
RealTimeDataPoints []RealTimeDataPoint `json:"sub_pos" swaggertype:"object"`
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue