Compare commits
2 Commits
a21a423624
...
f48807e5e5
| Author | SHA1 | Date |
|---|---|---|
|
|
f48807e5e5 | |
|
|
3f70be0d1c |
115
api.md
115
api.md
|
|
@ -1,115 +0,0 @@
|
||||||
# 接口协议
|
|
||||||
|
|
||||||
## 实时数据接口示例
|
|
||||||
|
|
||||||
### 开启实时数据的订阅
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"action": "start",
|
|
||||||
"components": [
|
|
||||||
{
|
|
||||||
"interval": "1",
|
|
||||||
"targets": [
|
|
||||||
"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
|
|
||||||
"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"interval": "2",
|
|
||||||
"targets": [
|
|
||||||
"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_C_rms"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 实时数据订阅成功
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"targets": [
|
|
||||||
{
|
|
||||||
"id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
|
|
||||||
"code": "1001",
|
|
||||||
"msg": "subscription success"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms",
|
|
||||||
"code": "1002",
|
|
||||||
"msg": "subscription failed"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 实时数据的返回
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"targets": [
|
|
||||||
{
|
|
||||||
"id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
|
|
||||||
"datas": [
|
|
||||||
{
|
|
||||||
"time": 1736305467506000000,
|
|
||||||
"value": 1
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"time": 1736305467506000000,
|
|
||||||
"value": 1
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms",
|
|
||||||
"datas": [
|
|
||||||
{
|
|
||||||
"time": 1736305467506000000,
|
|
||||||
"value": 1
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"time": 1736305467506000000,
|
|
||||||
"value": 1
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 结束实时数据的获取
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"action": "stop",
|
|
||||||
"targets": [
|
|
||||||
"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
|
|
||||||
"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## 实时数据状态值
|
|
||||||
|
|
||||||
| 动作描述 | 示例值 |
|
|
||||||
| :--- | :--- |
|
|
||||||
| 订阅成功 | 1001 |
|
|
||||||
| 订阅失败 | 1002 |
|
|
||||||
| 实时数据返回成功 | 1003 |
|
|
||||||
| 实时数据返回失败 | 1004 |
|
|
||||||
| 取消订阅成功 | 1005 |
|
|
||||||
| 取消订阅失败 | 1006 |
|
|
||||||
|
|
||||||
## 实时数据标志
|
|
||||||
|
|
||||||
### 以设备语言中的type作为区分方式
|
|
||||||
|
|
||||||
| 标志描述 | 示例值 |
|
|
||||||
| :--- | :--- |
|
|
||||||
| 遥测 | TE |
|
|
||||||
| 遥信 | TI |
|
|
||||||
| 遥控 | TC |
|
|
||||||
| 遥调 | TA |
|
|
||||||
| 定值 | - |
|
|
||||||
|
|
@ -50,5 +50,8 @@ var ErrChanIsNil = errors.New("this channel is nil")
|
||||||
// ErrConcurrentModify define error of concurrent modification detected
|
// ErrConcurrentModify define error of concurrent modification detected
|
||||||
var ErrConcurrentModify = errors.New("existed concurrent modification risk")
|
var ErrConcurrentModify = errors.New("existed concurrent modification risk")
|
||||||
|
|
||||||
// ErrUnsupportedAction define error of unsupported real time data monitor action
|
// ErrUnsupportedSubAction define error of unsupported real time data subscription action
|
||||||
var ErrUnsupportedAction = errors.New("unsupported real time data monitor action")
|
var ErrUnsupportedSubAction = errors.New("unsupported real time data subscription action")
|
||||||
|
|
||||||
|
// ErrUnsupportedLinkAction define error of unsupported measurement link process action
|
||||||
|
var ErrUnsupportedLinkAction = errors.New("unsupported rmeasurement link process action")
|
||||||
|
|
|
||||||
|
|
@ -29,3 +29,8 @@ const (
|
||||||
ChannelSuffixUBC = "UBC"
|
ChannelSuffixUBC = "UBC"
|
||||||
ChannelSuffixUCA = "UCA"
|
ChannelSuffixUCA = "UCA"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MaxIdentifyHierarchy define max data indentify syntax hierarchy
|
||||||
|
MaxIdentifyHierarchy = 7
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -38,3 +38,10 @@ const (
|
||||||
// RedisSpecCompTagMeasSetKey define redis set key which store all measurement keys under specific component tag
|
// RedisSpecCompTagMeasSetKey define redis set key which store all measurement keys under specific component tag
|
||||||
RedisSpecCompTagMeasSetKey = "%s_measurement_keys"
|
RedisSpecCompTagMeasSetKey = "%s_measurement_keys"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// SearchLinkAddAction define search link add action
|
||||||
|
SearchLinkAddAction = "add"
|
||||||
|
// SearchLinkDelAction define search link del action
|
||||||
|
SearchLinkDelAction = "del"
|
||||||
|
)
|
||||||
|
|
@ -0,0 +1,50 @@
|
||||||
|
// Package database define database operation functions
|
||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"modelRT/common/errcode"
|
||||||
|
"modelRT/network"
|
||||||
|
"modelRT/orm"
|
||||||
|
|
||||||
|
"github.com/gofrs/uuid"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CreateMeasurement define create measurement info of the circuit diagram into DB
|
||||||
|
func CreateMeasurement(ctx context.Context, tx *gorm.DB, measurementInfo network.MeasurementCreateInfo) (string, error) {
|
||||||
|
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
globalUUID, err := uuid.FromString(measurementInfo.UUID)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("format uuid from string type failed:%w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
measurement := orm.Measurement{
|
||||||
|
Tag: "",
|
||||||
|
Name: "",
|
||||||
|
Type: -1,
|
||||||
|
Size: -1,
|
||||||
|
DataSource: nil,
|
||||||
|
EventPlan: nil,
|
||||||
|
BayUUID: globalUUID,
|
||||||
|
ComponentUUID: globalUUID,
|
||||||
|
Op: -1,
|
||||||
|
Ts: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
result := tx.WithContext(cancelCtx).Create(&measurement)
|
||||||
|
if result.Error != nil || result.RowsAffected == 0 {
|
||||||
|
err := result.Error
|
||||||
|
if result.RowsAffected == 0 {
|
||||||
|
err = fmt.Errorf("%w:please check insert component slice", errcode.ErrInsertRowUnexpected)
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("insert component info failed:%w", err)
|
||||||
|
}
|
||||||
|
return strconv.FormatInt(measurement.ID, 10), nil
|
||||||
|
}
|
||||||
|
|
@ -64,7 +64,7 @@ func bulkInsertAllHierarchySets(ctx context.Context, rdb *redis.Client) error {
|
||||||
return fmt.Errorf("dynamic hierarchy insertion failed: %w", err)
|
return fmt.Errorf("dynamic hierarchy insertion failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("bulk insertion complete.")
|
log.Println("bulk insertion complete")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
// RedisSet defines the encapsulation struct of redis hash type
|
// RedisSet defines the encapsulation struct of redis hash type
|
||||||
type RedisSet struct {
|
type RedisSet struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
key string
|
||||||
rwLocker *locker.RedissionRWLocker
|
rwLocker *locker.RedissionRWLocker
|
||||||
storageClient *redis.Client
|
storageClient *redis.Client
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
|
@ -24,6 +25,7 @@ func NewRedisSet(ctx context.Context, setKey string, lockLeaseTime uint64, needR
|
||||||
token := ctx.Value("client_token").(string)
|
token := ctx.Value("client_token").(string)
|
||||||
return &RedisSet{
|
return &RedisSet{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
key: setKey,
|
||||||
rwLocker: locker.InitRWLocker(setKey, token, lockLeaseTime, needRefresh),
|
rwLocker: locker.InitRWLocker(setKey, token, lockLeaseTime, needRefresh),
|
||||||
storageClient: GetRedisClientInstance(),
|
storageClient: GetRedisClientInstance(),
|
||||||
logger: logger.GetLoggerInstance(),
|
logger: logger.GetLoggerInstance(),
|
||||||
|
|
@ -31,34 +33,34 @@ func NewRedisSet(ctx context.Context, setKey string, lockLeaseTime uint64, needR
|
||||||
}
|
}
|
||||||
|
|
||||||
// SADD define func of add redis set by members
|
// SADD define func of add redis set by members
|
||||||
func (rs *RedisSet) SADD(setKey string, members ...interface{}) error {
|
func (rs *RedisSet) SADD(members ...any) error {
|
||||||
err := rs.rwLocker.WLock(rs.ctx)
|
err := rs.rwLocker.WLock(rs.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", setKey, "error", err)
|
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", rs.key, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer rs.rwLocker.UnWLock(rs.ctx)
|
defer rs.rwLocker.UnWLock(rs.ctx)
|
||||||
|
|
||||||
err = rs.storageClient.SAdd(rs.ctx, setKey, members).Err()
|
err = rs.storageClient.SAdd(rs.ctx, rs.key, members).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(rs.ctx, "add set by memebers failed", "set_key", setKey, "members", members, "error", err)
|
logger.Error(rs.ctx, "add set by memebers failed", "set_key", rs.key, "members", members, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SREM define func of remove the specified members from redis set by key
|
// SREM define func of remove the specified members from redis set by key
|
||||||
func (rs *RedisSet) SREM(setKey string, members ...interface{}) error {
|
func (rs *RedisSet) SREM(members ...any) error {
|
||||||
err := rs.rwLocker.WLock(rs.ctx)
|
err := rs.rwLocker.WLock(rs.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", setKey, "error", err)
|
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", rs.key, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer rs.rwLocker.UnWLock(rs.ctx)
|
defer rs.rwLocker.UnWLock(rs.ctx)
|
||||||
|
|
||||||
count, err := rs.storageClient.SRem(rs.ctx, setKey, members).Result()
|
count, err := rs.storageClient.SRem(rs.ctx, rs.key, members).Result()
|
||||||
if err != nil || count != int64(len(members)) {
|
if err != nil || count != int64(len(members)) {
|
||||||
logger.Error(rs.ctx, "rem members from set failed", "set_key", setKey, "members", members, "error", err)
|
logger.Error(rs.ctx, "rem members from set failed", "set_key", rs.key, "members", members, "error", err)
|
||||||
|
|
||||||
return fmt.Errorf("rem members from set failed:%w", err)
|
return fmt.Errorf("rem members from set failed:%w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -66,27 +68,27 @@ func (rs *RedisSet) SREM(setKey string, members ...interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SMembers define func of get all memebers from redis set by key
|
// SMembers define func of get all memebers from redis set by key
|
||||||
func (rs *RedisSet) SMembers(setKey string) ([]string, error) {
|
func (rs *RedisSet) SMembers() ([]string, error) {
|
||||||
err := rs.rwLocker.RLock(rs.ctx)
|
err := rs.rwLocker.RLock(rs.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(rs.ctx, "lock rLock by setKey failed", "set_key", setKey, "error", err)
|
logger.Error(rs.ctx, "lock rLock by setKey failed", "set_key", rs.key, "error", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer rs.rwLocker.UnRLock(rs.ctx)
|
defer rs.rwLocker.UnRLock(rs.ctx)
|
||||||
|
|
||||||
result, err := rs.storageClient.SMembers(rs.ctx, setKey).Result()
|
result, err := rs.storageClient.SMembers(rs.ctx, rs.key).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", setKey, "error", err)
|
logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", rs.key, "error", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SIsMember define func of determine whether an member is in set by key
|
// SIsMember define func of determine whether an member is in set by key
|
||||||
func (rs *RedisSet) SIsMember(setKey string, member interface{}) (bool, error) {
|
func (rs *RedisSet) SIsMember(member any) (bool, error) {
|
||||||
result, err := rs.storageClient.SIsMember(rs.ctx, setKey, member).Result()
|
result, err := rs.storageClient.SIsMember(rs.ctx, rs.key, member).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", setKey, "error", err)
|
logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", rs.key, "error", err)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,8 @@ type RedisZSet struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRedisZSet define func of new redis zset instance
|
// NewRedisZSet define func of new redis zset instance
|
||||||
func NewRedisZSet(ctx context.Context, key string, token string, lockLeaseTime uint64, needRefresh bool) *RedisZSet {
|
func NewRedisZSet(ctx context.Context, key string, lockLeaseTime uint64, needRefresh bool) *RedisZSet {
|
||||||
|
token := ctx.Value("client_token").(string)
|
||||||
return &RedisZSet{
|
return &RedisZSet{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
rwLocker: locker.InitRWLocker(key, token, lockLeaseTime, needRefresh),
|
rwLocker: locker.InitRWLocker(key, token, lockLeaseTime, needRefresh),
|
||||||
|
|
|
||||||
|
|
@ -38,14 +38,14 @@ func MeasurementGetHandler(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
zset := diagram.NewRedisZSet(c, request.MeasurementToken, clientToken, 0, false)
|
zset := diagram.NewRedisZSet(c, request.MeasurementToken, 0, false)
|
||||||
points, err := zset.ZRANGE(request.MeasurementToken, 0, -1)
|
points, err := zset.ZRANGE(request.MeasurementToken, 0, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(c, "failed to get measurement data from redis", "measurement_token", request.MeasurementToken, "error", err)
|
logger.Error(c, "failed to get measurement data from redis", "measurement_token", request.MeasurementToken, "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
Code: http.StatusInternalServerError,
|
Code: http.StatusInternalServerError,
|
||||||
Msg: err.Error(),
|
Msg: err.Error(),
|
||||||
Payload: map[string]interface{}{
|
Payload: map[string]any{
|
||||||
"measurement_id": request.MeasurementID,
|
"measurement_id": request.MeasurementID,
|
||||||
"measurement_token": request.MeasurementToken,
|
"measurement_token": request.MeasurementToken,
|
||||||
},
|
},
|
||||||
|
|
@ -60,7 +60,7 @@ func MeasurementGetHandler(c *gin.Context) {
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
Msg: err.Error(),
|
Msg: err.Error(),
|
||||||
Payload: map[string]interface{}{
|
Payload: map[string]any{
|
||||||
"measurement_id": request.MeasurementID,
|
"measurement_id": request.MeasurementID,
|
||||||
"measurement_token": request.MeasurementToken,
|
"measurement_token": request.MeasurementToken,
|
||||||
"measurement_value": points,
|
"measurement_value": points,
|
||||||
|
|
@ -72,7 +72,7 @@ func MeasurementGetHandler(c *gin.Context) {
|
||||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
Msg: "success",
|
Msg: "success",
|
||||||
Payload: map[string]interface{}{
|
Payload: map[string]any{
|
||||||
"measurement_id": request.MeasurementID,
|
"measurement_id": request.MeasurementID,
|
||||||
"measurement_token": request.MeasurementToken,
|
"measurement_token": request.MeasurementToken,
|
||||||
"measurement_info": measurementInfo,
|
"measurement_info": measurementInfo,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,136 @@
|
||||||
|
// Package handler provides HTTP handlers for various endpoints.
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"modelRT/constants"
|
||||||
|
"modelRT/database"
|
||||||
|
"modelRT/diagram"
|
||||||
|
"modelRT/logger"
|
||||||
|
"modelRT/network"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MeasurementLinkHandler defines the measurement link process api
|
||||||
|
func MeasurementLinkHandler(c *gin.Context) {
|
||||||
|
var request network.MeasurementLinkRequest
|
||||||
|
|
||||||
|
clientToken := c.GetString("client_token")
|
||||||
|
if clientToken == "" {
|
||||||
|
err := constants.ErrGetClientToken
|
||||||
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.ShouldBindJSON(&request); err != nil {
|
||||||
|
logger.Error(c, "failed to unmarshal measurement create request", "error", err)
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: "Invalid request body format: " + err.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
pgClient := database.GetPostgresDBClient()
|
||||||
|
measurementID := request.MeasurementID
|
||||||
|
action := request.Action
|
||||||
|
measurementInfo, err := database.QueryMeasurementByID(c, pgClient, measurementID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(c, "failed to query measurement info by measurement id from postgres", "meauserement_id", measurementID, "error", err)
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusInternalServerError,
|
||||||
|
Msg: "failed to query measurement info record: " + err.Error(),
|
||||||
|
Payload: map[string]any{
|
||||||
|
"id": measurementID,
|
||||||
|
"action": action,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
componentInfo, err := database.QueryComponentByUUID(c, pgClient, measurementInfo.ComponentUUID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(c, "failed to query component info by component uuid from postgres", "component_uuid", measurementInfo.ComponentUUID, "error", err)
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusInternalServerError,
|
||||||
|
Msg: "failed to query component info record: " + err.Error(),
|
||||||
|
Payload: map[string]any{
|
||||||
|
"id": measurementID,
|
||||||
|
"action": action,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
allMeasSet := diagram.NewRedisSet(c, constants.RedisAllMeasTagSetKey, 0, false)
|
||||||
|
compMeasLinkKey := fmt.Sprintf(constants.RedisSpecCompTagMeasSetKey, componentInfo.Tag)
|
||||||
|
compMeasLinkSet := diagram.NewRedisSet(c, compMeasLinkKey, 0, false)
|
||||||
|
|
||||||
|
switch action {
|
||||||
|
case constants.SearchLinkAddAction:
|
||||||
|
err1 := allMeasSet.SADD(measurementInfo.Tag)
|
||||||
|
err2 := compMeasLinkSet.SADD(measurementInfo.Tag)
|
||||||
|
err = processActionError(err1, err2, action)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(c, "add measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
|
||||||
|
}
|
||||||
|
case constants.SearchLinkDelAction:
|
||||||
|
err1 := allMeasSet.SREM(measurementInfo.Tag)
|
||||||
|
err2 := compMeasLinkSet.SREM(measurementInfo.Tag)
|
||||||
|
err = processActionError(err1, err2, action)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
err = constants.ErrUnsupportedLinkAction
|
||||||
|
logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
Payload: map[string]any{
|
||||||
|
"measurement_id": request.MeasurementID,
|
||||||
|
"action": request.Action,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info(c, "process measurement link success", "measurement_id", measurementID, "action", request.Action)
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
Msg: "measurement link process success",
|
||||||
|
Payload: map[string]any{
|
||||||
|
"measurement_id": measurementID,
|
||||||
|
"action": request.Action,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func processActionError(err1, err2 error, action string) error {
|
||||||
|
var err error
|
||||||
|
if err1 != nil && err2 != nil {
|
||||||
|
err = errors.Join(err1, err2)
|
||||||
|
err = fmt.Errorf("process measurement link failed, allMeasSet %s operation and compMeasLinkSet %s operation failed: %w", action, action, err)
|
||||||
|
} else if err1 != nil {
|
||||||
|
err = fmt.Errorf("process measurement link failed: allMeasSet %s operation failed: %w", action, err1)
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("process measurement link failed: compMeasLinkSet %s operation: %w", action, err2)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
@ -216,7 +216,7 @@ func RealTimeSubHandler(c *gin.Context) {
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedAction, request.Action)
|
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action)
|
||||||
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
||||||
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
||||||
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err)
|
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err)
|
||||||
|
|
|
||||||
6
main.go
6
main.go
|
|
@ -220,7 +220,7 @@ func main() {
|
||||||
go func() {
|
go func() {
|
||||||
<-done
|
<-done
|
||||||
if err := server.Shutdown(context.Background()); err != nil {
|
if err := server.Shutdown(context.Background()); err != nil {
|
||||||
logger.Error(ctx, "ShutdownServerError", "err", err)
|
logger.Error(ctx, "shutdown serverError", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
@ -229,10 +229,10 @@ func main() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == http.ErrServerClosed {
|
if err == http.ErrServerClosed {
|
||||||
// the service receives the shutdown signal normally and then closes
|
// the service receives the shutdown signal normally and then closes
|
||||||
logger.Info(ctx, "Server closed under request")
|
logger.Info(ctx, "server closed under request")
|
||||||
} else {
|
} else {
|
||||||
// abnormal shutdown of service
|
// abnormal shutdown of service
|
||||||
logger.Error(ctx, "Server closed unexpected", "err", err)
|
logger.Error(ctx, "server closed unexpected", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er
|
||||||
}
|
}
|
||||||
|
|
||||||
// start grid tagname fuzzy search
|
// start grid tagname fuzzy search
|
||||||
recommends, err := runFuzzySearch(ctx, gridSearchInput, inputSliceLen)
|
recommends, err := runFuzzySearch(ctx, gridSearchInput, "", inputSliceLen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "fuzzy search failed for level 1", "search_input", gridSearchInput, "error", err)
|
logger.Error(ctx, "fuzzy search failed for level 1", "search_input", gridSearchInput, "error", err)
|
||||||
return []string{}, false, err
|
return []string{}, false, err
|
||||||
|
|
@ -141,9 +141,10 @@ func getSpecificKeyByLength(inputLen int, input string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleLevelFuzzySearch define func to process recommendation logic for specific levels(level >= 2)
|
// handleLevelFuzzySearch define func to process recommendation logic for specific levels(level >= 2)
|
||||||
func handleLevelFuzzySearch(ctx context.Context, rdb *redis.Client, level int, keySetKey string, inputSlice []string) ([]string, bool, error) {
|
func handleLevelFuzzySearch(ctx context.Context, rdb *redis.Client, hierarchy int, keySetKey string, inputSlice []string) ([]string, bool, error) {
|
||||||
searchInputIndex := level - 1
|
searchInputIndex := hierarchy - 1
|
||||||
searchInput := inputSlice[searchInputIndex]
|
searchInput := inputSlice[searchInputIndex]
|
||||||
|
searchPrefix := strings.Join(inputSlice[0:searchInputIndex], ".")
|
||||||
|
|
||||||
if searchInput == "" {
|
if searchInput == "" {
|
||||||
var specificalKey string
|
var specificalKey string
|
||||||
|
|
@ -152,11 +153,11 @@ func handleLevelFuzzySearch(ctx context.Context, rdb *redis.Client, level int, k
|
||||||
specificalKey = inputSlice[specificalKeyIndex]
|
specificalKey = inputSlice[specificalKeyIndex]
|
||||||
}
|
}
|
||||||
|
|
||||||
allResults, isFuzzy, err := getKeyBySpecificsLevel(ctx, rdb, level, specificalKey)
|
allResults, isFuzzy, err := getKeyBySpecificsLevel(ctx, rdb, hierarchy, specificalKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []string{}, false, err
|
return []string{}, false, err
|
||||||
}
|
}
|
||||||
recommandResults := combineQueryResultByInput(level, inputSlice, allResults)
|
recommandResults := combineQueryResultByInput(hierarchy, inputSlice, allResults)
|
||||||
return recommandResults, isFuzzy, nil
|
return recommandResults, isFuzzy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -167,25 +168,28 @@ func handleLevelFuzzySearch(ctx context.Context, rdb *redis.Client, level int, k
|
||||||
}
|
}
|
||||||
|
|
||||||
if keyExists {
|
if keyExists {
|
||||||
|
if hierarchy == constants.MaxIdentifyHierarchy {
|
||||||
|
return []string{""}, false, nil
|
||||||
|
}
|
||||||
return []string{"."}, false, nil
|
return []string{"."}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// start redis fuzzy search
|
// start redis fuzzy search
|
||||||
recommends, err := runFuzzySearch(ctx, searchInput, level)
|
recommends, err := runFuzzySearch(ctx, searchInput, searchPrefix, hierarchy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "fuzzy search failed for level", "level", level, "search_input", searchInput, "error", err)
|
logger.Error(ctx, "fuzzy search failed by hierarchy", "hierarchy", hierarchy, "search_input", searchInput, "error", err)
|
||||||
return []string{}, false, err
|
return []string{}, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(recommends) == 0 {
|
if len(recommends) == 0 {
|
||||||
logger.Error(ctx, "fuzzy search without result", "level", level, "search_input", searchInput, "error", err)
|
logger.Error(ctx, "fuzzy search without result", "hierarchy", hierarchy, "search_input", searchInput, "error", err)
|
||||||
return []string{}, true, nil
|
return []string{}, true, nil
|
||||||
}
|
}
|
||||||
return combineQueryResultByInput(level, inputSlice, recommends), true, nil
|
return combineQueryResultByInput(hierarchy, inputSlice, recommends), true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// runFuzzySearch define func to process redis fuzzy search
|
// runFuzzySearch define func to process redis fuzzy search
|
||||||
func runFuzzySearch(ctx context.Context, searchInput string, inputSliceLen int) ([]string, error) {
|
func runFuzzySearch(ctx context.Context, searchInput string, searchPrefix string, inputSliceLen int) ([]string, error) {
|
||||||
searchInputLen := len(searchInput)
|
searchInputLen := len(searchInput)
|
||||||
|
|
||||||
for searchInputLen != 0 {
|
for searchInputLen != 0 {
|
||||||
|
|
@ -210,8 +214,24 @@ func runFuzzySearch(ctx context.Context, searchInput string, inputSliceLen int)
|
||||||
|
|
||||||
var recommends []string
|
var recommends []string
|
||||||
for _, result := range results {
|
for _, result := range results {
|
||||||
termSlice := strings.Split(result.Term, ".")
|
term := result.Term
|
||||||
if len(termSlice) <= inputSliceLen {
|
var termSliceLen int
|
||||||
|
var termPrefix string
|
||||||
|
|
||||||
|
lastDotIndex := strings.LastIndex(term, ".")
|
||||||
|
if lastDotIndex == -1 {
|
||||||
|
termPrefix = ""
|
||||||
|
} else {
|
||||||
|
termPrefix = term[:lastDotIndex]
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.Term == "" {
|
||||||
|
termSliceLen = 1
|
||||||
|
} else {
|
||||||
|
termSliceLen = strings.Count(result.Term, ".") + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if termSliceLen == inputSliceLen && termPrefix == searchPrefix {
|
||||||
recommends = append(recommends, result.Term)
|
recommends = append(recommends, result.Term)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ type TopologicUUIDCreateInfo struct {
|
||||||
Comment string `json:"comment"`
|
Comment string `json:"comment"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ComponentCreateInfo defines circuit diagram component create index info
|
// ComponentCreateInfo defines circuit diagram component create info
|
||||||
type ComponentCreateInfo struct {
|
type ComponentCreateInfo struct {
|
||||||
UUID string `json:"uuid"`
|
UUID string `json:"uuid"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
|
|
@ -37,6 +37,20 @@ type ComponentCreateInfo struct {
|
||||||
Op int `json:"op"`
|
Op int `json:"op"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MeasurementCreateInfo defines circuit diagram measurement create info
|
||||||
|
type MeasurementCreateInfo struct {
|
||||||
|
UUID string `json:"uuid"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Context map[string]any `json:"context"`
|
||||||
|
GridID int64 `json:"grid_id"`
|
||||||
|
ZoneID int64 `json:"zone_id"`
|
||||||
|
StationID int64 `json:"station_id"`
|
||||||
|
PageID int64 `json:"page_id"`
|
||||||
|
Tag string `json:"tag"`
|
||||||
|
Params string `json:"params"`
|
||||||
|
Op int `json:"op"`
|
||||||
|
}
|
||||||
|
|
||||||
// CircuitDiagramCreateRequest defines request params of circuit diagram create api
|
// CircuitDiagramCreateRequest defines request params of circuit diagram create api
|
||||||
type CircuitDiagramCreateRequest struct {
|
type CircuitDiagramCreateRequest struct {
|
||||||
PageID int64 `json:"page_id"`
|
PageID int64 `json:"page_id"`
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,15 @@ type MeasurementGetRequest struct {
|
||||||
MeasurementToken string `json:"token" example:"some-token"`
|
MeasurementToken string `json:"token" example:"some-token"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MeasurementLinkRequest defines the request payload for process an measurement link
|
||||||
|
type MeasurementLinkRequest struct {
|
||||||
|
// required: true
|
||||||
|
MeasurementID int64 `json:"measurement_id" example:"1001"`
|
||||||
|
// required: true
|
||||||
|
// enum: [add, del]
|
||||||
|
Action string `json:"action" example:"add"`
|
||||||
|
}
|
||||||
|
|
||||||
// MeasurementRecommendRequest defines the request payload for an measurement recommend
|
// MeasurementRecommendRequest defines the request payload for an measurement recommend
|
||||||
type MeasurementRecommendRequest struct {
|
type MeasurementRecommendRequest struct {
|
||||||
Input string `form:"input,omitempty" example:"grid1"`
|
Input string `form:"input,omitempty" example:"grid1"`
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue