diff --git a/api.md b/api.md new file mode 100644 index 0000000..e739f74 --- /dev/null +++ b/api.md @@ -0,0 +1,115 @@ +# 接口协议 + +## 实时数据接口示例 + +### 开启实时数据的订阅 + +```json +{ + "action": "start", + "components": [ + { + "interval": "1", + "targets": [ + "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", + "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms" + ] + }, + { + "interval": "2", + "targets": [ + "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_C_rms" + ] + } + ] +} +``` + +### 实时数据订阅成功 + +```json +{ + "targets": [ + { + "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", + "code": "1001", + "msg": "subscription success" + }, + { + "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms", + "code": "1002", + "msg": "subscription failed" + } + ] +} +``` + +### 实时数据的返回 + +```json +{ + "targets": [ + { + "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", + "datas": [ + { + "time": 1736305467506000000, + "value": 1 + }, + { + "time": 1736305467506000000, + "value": 1 + } + ] + }, + { + "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms", + "datas": [ + { + "time": 1736305467506000000, + "value": 1 + }, + { + "time": 1736305467506000000, + "value": 1 + } + ] + } + ] +} +``` + +### 结束实时数据的获取 + +```json +{ + "action": "stop", + "targets": [ + "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", + "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms" + ] +} +``` + +## 实时数据状态值 + +| 动作描述 | 示例值 | +| :--- | :--- | +| 订阅成功 | 1001 | +| 订阅失败 | 1002 | +| 实时数据返回成功 | 1003 | +| 实时数据返回失败 | 1004 | +| 取消订阅成功 | 1005 | +| 取消订阅失败 | 1006 | + +## 实时数据标志 + +### 以设备语言中的type作为区分方式 + +| 标志描述 | 示例值 | +| :--- | :--- | +| 遥测 | TE | +| 遥信 | TI | +| 遥控 | TC | +| 遥调 | TA | +| 定值 | - | diff --git a/constants/error.go b/constants/error.go index 2fc06e9..6c7f60a 100644 --- a/constants/error.go +++ b/constants/error.go @@ -49,3 +49,6 @@ var ErrChanIsNil = errors.New("this channel is nil") // ErrConcurrentModify define error of concurrent modification detected var ErrConcurrentModify = errors.New("existed concurrent modification risk") + +// ErrUnsupportedAction define error of unsupported real time data monitor action +var ErrUnsupportedAction = errors.New("unsupported real time data monitor action") diff --git a/constants/monitor.go b/constants/monitor.go new file mode 100644 index 0000000..981bb76 --- /dev/null +++ b/constants/monitor.go @@ -0,0 +1,11 @@ +// Package constants define constant variable +package constants + +const ( + // MonitorStartAction define the real time monitor start action + MonitorStartAction string = "start" + // MonitorStopAction define the real time monitor stop action + MonitorStopAction string = "stop" + // MonitorAppendAction define the real time monitor append action + MonitorAppendAction string = "append" +) diff --git a/handler/real_time_data_monitor.go b/handler/real_time_data_monitor.go new file mode 100644 index 0000000..47fc38e --- /dev/null +++ b/handler/real_time_data_monitor.go @@ -0,0 +1,139 @@ +// Package handler provides HTTP handlers for various endpoints. +package handler + +import ( + "fmt" + "net/http" + "sync" + + "modelRT/constants" + "modelRT/logger" + "modelRT/network" + + "github.com/gin-gonic/gin" + "github.com/gofrs/uuid" +) + +var globalMonitorState *SharedMonitorState + +func init() { + globalMonitorState = NewSharedMonitorState() +} + +// RealTimeMonitorHandler define real time data monitor process API +// @Summary 开始或结束实时数据监控 +// @Description 根据用户输入的组件token,从 modelRT 服务中开始或结束对于实时数据的监控 +// @Tags RealTime Component +// @Accept json +// @Produce json +// @Router /data/realtime [get] +func RealTimeMonitorHandler(c *gin.Context) { + var request network.RealTimeQueryRequest + var monitorAction string + var monitorID string + + if err := c.ShouldBindJSON(&request); err != nil { + logger.Error(c, "failed to unmarshal real time query request", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + }) + return + } + + if request.Action == constants.MonitorStartAction && request.MonitorID == "" { + monitorAction = request.Action + id, err := uuid.NewV4() + if err != nil { + logger.Error(c, "failed to generate monitor id", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + }) + return + } + monitorID = id.String() + } else if request.Action == constants.MonitorStartAction && request.MonitorID != "" { + monitorAction = constants.MonitorAppendAction + monitorID = request.MonitorID + } else if request.Action == constants.MonitorStopAction && request.MonitorID != "" { + monitorAction = request.Action + monitorID = request.MonitorID + } + + switch monitorAction { + case constants.MonitorStartAction: + fmt.Println(monitorID) + var config RealTimeMonitorConfig + for _, component := range request.Components { + fmt.Println(config) + globalMonitorState.Get(monitorID, component.Interval) + } + case constants.MonitorStopAction: + for _, component := range request.Components { + fmt.Println(component.Interval) + } + case constants.MonitorAppendAction: + for _, component := range request.Components { + fmt.Println(component.Interval) + } + default: + err := fmt.Errorf("%w: %s", constants.ErrUnsupportedAction, request.Action) + logger.Error(c, "unsupported action of real time data monitor request", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + }) + return + } +} + +// RealTimeMonitorComponent define struct of real time monitor component +type RealTimeMonitorComponent struct { + interval string + targets []string +} + +// RealTimeMonitorConfig define struct of real time monitor config +type RealTimeMonitorConfig struct { + noticeChan chan struct{} + components map[string]*RealTimeMonitorComponent +} + +// SharedMonitorState define struct of shared monitor state with mutex +type SharedMonitorState struct { + monitorMap map[string]*RealTimeMonitorConfig + mutex sync.RWMutex +} + +// NewSharedMonitorState define function to create new SharedMonitorState +func NewSharedMonitorState() *SharedMonitorState { + return &SharedMonitorState{ + monitorMap: make(map[string]*RealTimeMonitorConfig), + } +} + +// Set defined function to set value in SharedMonitorState +func (s *SharedMonitorState) Set(key string, value *RealTimeMonitorConfig) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.monitorMap[key] = value +} + +// Get defined function to get value from SharedMonitorState +func (s *SharedMonitorState) Get(monitorID, interval string) ([]string, bool) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + config, ok := s.monitorMap[monitorID] + if !ok { + return nil, false + } + + component, ok := config.components[interval] + if !ok { + return nil, false + } + + return component.targets, true +} diff --git a/handler/real_time_data_query.go b/handler/real_time_data_query.go index f0a4610..40fbbd4 100644 --- a/handler/real_time_data_query.go +++ b/handler/real_time_data_query.go @@ -61,16 +61,10 @@ var wsUpgrader = websocket.Upgrader{ // // @Router /data/realtime [get] func QueryRealTimeDataHandler(c *gin.Context) { - req := c.Request - queryParams := req.URL.Query() - token := queryParams.Get("token") - beginStr := queryParams.Get("begin") - endStr := queryParams.Get("end") + var request network.RealTimeQueryRequest - if token == "" || beginStr == "" || endStr == "" { - err := fmt.Errorf("missing required parameters: token=%s, begin=%s, end=%s", token, beginStr, endStr) - - logger.Error(c, "failed to unmarshal measurement recommend request", "error", err) + if err := c.ShouldBindJSON(&request); err != nil { + logger.Error(c, "failed to unmarshal real time query request", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), @@ -78,8 +72,6 @@ func QueryRealTimeDataHandler(c *gin.Context) { return } - // TODO 增加对 token 的解析 - conn, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { logger.Error(c, "upgrade http protocol to websocket protocal failed", "error", err) @@ -87,16 +79,12 @@ func QueryRealTimeDataHandler(c *gin.Context) { } defer conn.Close() + fmt.Println(request) + // start a goroutine to open a websocket service with the dataRT service and use the channel to pass data back. Start and maintain the websocket connection with the front-end UI in the local api transportChannel := make(chan []any, 100) closeChannel := make(chan struct{}) - params := url.Values{} - params.Set("token", token) - params.Set("begin", beginStr) - params.Set("end", endStr) - go receiveRealTimeDataByWebSocket(c, params, transportChannel, closeChannel) - for { select { case data := <-transportChannel: diff --git a/network/real_time_data_query_request.go b/network/real_time_data_query_request.go new file mode 100644 index 0000000..b5fa13e --- /dev/null +++ b/network/real_time_data_query_request.go @@ -0,0 +1,20 @@ +// Package network define struct of network operation +package network + +// RealTimeQueryRequest define struct of real time data query request +type RealTimeQueryRequest struct { + // required: true + // enum: [start, stop] + Action string `json:"action" example:"start" description:"请求的操作,例如 start/stop"` + // TODO 增加monitorID的example值说明 + MonitorID string `json:"monitor_id" example:"xxxx" description:"用于标识不同client的监控请求ID"` + + // required: true + Components []RealTimeComponentItem `json:"components" description:"定义不同的数据采集策略和目标"` +} + +// RealTimeComponentItem define struct of real time component item +type RealTimeComponentItem struct { + Interval string `json:"interval" example:"1" description:"数据采集的时间间隔(秒)"` + Targets []string `json:"targets" example:"[\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms\",\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms\"]" description:"需要采集数据的测点或标签名称列表"` +} diff --git a/network/real_time_data_request.go b/network/real_time_data_receive_request.go similarity index 100% rename from network/real_time_data_request.go rename to network/real_time_data_receive_request.go diff --git a/router/data.go b/router/data.go index b7d2564..39f743b 100644 --- a/router/data.go +++ b/router/data.go @@ -10,6 +10,7 @@ import ( // registerDataRoutes define func of register data routes func registerDataRoutes(rg *gin.RouterGroup) { g := rg.Group("/data/") - g.GET("realtime", handler.QueryRealTimeDataHandler) + // TODO 修改为ws路径 + // g.GET("realtime", handler.QueryRealTimeMonitorHandler) g.GET("history", handler.QueryHistoryDataHandler) } diff --git a/router/monitor.go b/router/monitor.go new file mode 100644 index 0000000..a1c3e84 --- /dev/null +++ b/router/monitor.go @@ -0,0 +1,14 @@ +// Package router provides router config +package router + +import ( + "modelRT/handler" + + "github.com/gin-gonic/gin" +) + +// registerMonitorRoutes define func of register monitordata routes +func registerMonitorRoutes(rg *gin.RouterGroup) { + g := rg.Group("/monitors/") + g.POST("realtime", handler.RealTimeMonitorHandler) +} diff --git a/router/router.go b/router/router.go index 6bb6be6..24dee32 100644 --- a/router/router.go +++ b/router/router.go @@ -24,4 +24,5 @@ func RegisterRoutes(engine *gin.Engine, clientToken string) { registerAttrRoutes(routeGroup) registerMeasurementRoutes(routeGroup, clientToken) registerDataRoutes(routeGroup) + registerMonitorRoutes(routeGroup) }