2025-01-23 14:56:01 +08:00
// Package handler provides HTTP handlers for various endpoints.
package handler
import (
2025-10-27 16:47:04 +08:00
"context"
2025-01-23 14:56:01 +08:00
"net/http"
2025-10-27 16:47:04 +08:00
"net/url"
2025-10-28 16:59:16 +08:00
"time"
2025-01-23 14:56:01 +08:00
"modelRT/logger"
"modelRT/network"
2025-10-27 16:47:04 +08:00
"github.com/bitly/go-simplejson"
2025-01-23 14:56:01 +08:00
"github.com/gin-gonic/gin"
2025-10-27 16:47:04 +08:00
"github.com/gorilla/websocket"
2025-10-28 16:59:16 +08:00
jsoniter "github.com/json-iterator/go"
2025-01-23 14:56:01 +08:00
)
2025-10-28 16:59:16 +08:00
var wsUpgrader = websocket . Upgrader {
ReadBufferSize : 1024 ,
WriteBufferSize : 1024 ,
// CheckOrigin 必须返回 true, 否则浏览器会拒绝连接
CheckOrigin : func ( _ * http . Request ) bool {
// 在生产环境中,应该更严格地检查 Origin 头部
return true
} ,
}
2025-01-23 14:56:01 +08:00
// QueryRealTimeDataHandler define query real time data process API
2025-10-27 16:47:04 +08:00
// @Summary 获取实时测点数据
// @Description 根据用户输入的组件token,从 dataRT 服务中持续获取测点实时数据
// @Tags RealTime Component
// @Accept json
// @Produce json
// @Param token query string true "测量点唯一标识符 (e.g.grid_1:zone_1:station_1:transformfeeder1_220.I_A_rms)"
// @Param begin query int true "查询起始时间 (Unix时间戳, e.g., 1761008266)"
// @Param end query int true "查询结束时间 (Unix时间戳, e.g., 1761526675)"
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeDataPayload} "返回实时数据成功"
//
// @Example 200 {
// "code": 200,
// "msg": "success",
// "payload": {
// "input": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
// "sub_pos": [
// {
// "time": 1736305467506000000,
// "value": 1
// }
// ]
// }
// }
//
// @Failure 400 {object} network.FailureResponse "返回实时数据失败"
//
// @Example 400 {
// "code": 400,
// "msg": "failed to get real time data from dataRT",
// }
//
// @Router /data/realtime [get]
2025-01-23 14:56:01 +08:00
func QueryRealTimeDataHandler ( c * gin . Context ) {
2025-11-03 17:35:03 +08:00
var request network . RealTimeQueryRequest
2025-10-28 16:59:16 +08:00
2025-11-03 17:35:03 +08:00
if err := c . ShouldBindJSON ( & request ) ; err != nil {
logger . Error ( c , "failed to unmarshal real time query request" , "error" , err )
2025-10-28 16:59:16 +08:00
c . JSON ( http . StatusOK , network . FailureResponse {
Code : http . StatusBadRequest ,
Msg : err . Error ( ) ,
} )
return
}
2025-01-23 14:56:01 +08:00
2025-10-28 16:59:16 +08:00
conn , err := wsUpgrader . Upgrade ( c . Writer , c . Request , nil )
if err != nil {
logger . Error ( c , "upgrade http protocol to websocket protocal failed" , "error" , err )
return
}
defer conn . Close ( )
// start a goroutine to open a websocket service with the dataRT service and use the channel to pass data back. Start and maintain the websocket connection with the front-end UI in the local api
transportChannel := make ( chan [ ] any , 100 )
closeChannel := make ( chan struct { } )
2025-10-27 16:47:04 +08:00
for {
select {
case data := <- transportChannel :
2025-10-28 16:59:16 +08:00
respByte , err := jsoniter . Marshal ( data )
if err != nil {
logger . Error ( c , "marshal real time data to bytes failed" , "error" , err )
continue
}
err = conn . WriteMessage ( websocket . TextMessage , respByte )
if err != nil {
logger . Error ( c , "write message to websocket connection failed" , "error" , err )
continue
}
2025-10-27 16:47:04 +08:00
case <- closeChannel :
2025-10-28 16:59:16 +08:00
logger . Info ( c , "data receiving goroutine has been closed" )
// TODO 优化时间控制
deadline := time . Now ( ) . Add ( 5 * time . Second )
err := conn . WriteControl ( websocket . CloseMessage , websocket . FormatCloseMessage ( websocket . CloseNormalClosure , "the session ended normally" ) , deadline )
if err != nil {
logger . Error ( c , "sending close control message failed" , "error" , err )
}
// gracefully close session processing
err = conn . Close ( )
if err != nil {
logger . Error ( c , "websocket conn closed failed" , "error" , err )
}
logger . Info ( c , "websocket connection closed successfully." )
2025-01-23 14:56:01 +08:00
}
}
}
2025-10-27 16:47:04 +08:00
2025-10-28 16:59:16 +08:00
// receiveRealTimeDataByWebSocket define func of receive real time data by websocket
func receiveRealTimeDataByWebSocket ( ctx context . Context , params url . Values , transportChannel chan [ ] any , closeChannel chan struct { } ) {
2025-10-27 16:47:04 +08:00
serverURL := "ws://127.0.0.1:8888/ws/points"
u , err := url . Parse ( serverURL )
if err != nil {
logger . Error ( ctx , "parse url failed" , "error" , err )
}
q := u . Query ( )
for key , values := range params {
for _ , value := range values {
q . Add ( key , value )
}
}
u . RawQuery = q . Encode ( )
finalServerURL := u . String ( )
conn , resp , err := websocket . DefaultDialer . Dial ( finalServerURL , nil )
if err != nil {
logger . Error ( ctx , "dialing websocket server failed" , "error" , err )
if resp != nil {
2025-10-28 16:59:16 +08:00
logger . Error ( ctx , "websocket server response" , "status" , resp . Status )
2025-10-27 16:47:04 +08:00
}
return
}
defer conn . Close ( )
for {
msgType , message , err := conn . ReadMessage ( )
if err != nil {
// check if it is an expected shutdown error
if websocket . IsCloseError ( err , websocket . CloseNormalClosure , websocket . CloseGoingAway ) {
logger . Info ( ctx , "connection closed normally" )
} else {
logger . Error ( ctx , "abnormal disconnection from websocket server" , "err" , err )
}
close ( closeChannel )
break
}
logger . Info ( ctx , "received info from dataRT server" , "msg_type" , messageTypeToString ( msgType ) , "message" , string ( message ) )
2025-10-28 16:59:16 +08:00
2025-10-27 16:47:04 +08:00
js , err := simplejson . NewJson ( message )
if err != nil {
logger . Error ( ctx , "parse real time data from message failed" , "message" , string ( message ) , "err" , err )
continue
}
2025-10-28 16:59:16 +08:00
subPoss , err := js . Get ( "sub_pos" ) . Array ( )
2025-10-27 16:47:04 +08:00
if err != nil {
2025-10-28 16:59:16 +08:00
logger . Error ( ctx , "parse sub_pos struct from message json info" , "sub_pos" , js . Get ( "sub_pos" ) , "err" , err )
2025-10-27 16:47:04 +08:00
continue
}
2025-10-28 16:59:16 +08:00
transportChannel <- subPoss
2025-10-27 16:47:04 +08:00
}
return
}
// messageTypeToString define func of auxiliary to convert message type to string
func messageTypeToString ( t int ) string {
switch t {
case websocket . TextMessage :
return "TEXT"
case websocket . BinaryMessage :
return "BINARY"
case websocket . PingMessage :
return "PING"
case websocket . PongMessage :
return "PONG"
case websocket . CloseMessage :
return "CLOSE"
default :
return "UNKNOWN"
}
}