optimize measurement link api
This commit is contained in:
parent
3f70be0d1c
commit
f48807e5e5
|
|
@ -50,5 +50,8 @@ var ErrChanIsNil = errors.New("this channel is nil")
|
|||
// ErrConcurrentModify define error of concurrent modification detected
|
||||
var ErrConcurrentModify = errors.New("existed concurrent modification risk")
|
||||
|
||||
// ErrUnsupportedAction define error of unsupported real time data monitor action
|
||||
var ErrUnsupportedAction = errors.New("unsupported real time data monitor action")
|
||||
// ErrUnsupportedSubAction define error of unsupported real time data subscription action
|
||||
var ErrUnsupportedSubAction = errors.New("unsupported real time data subscription action")
|
||||
|
||||
// ErrUnsupportedLinkAction define error of unsupported measurement link process action
|
||||
var ErrUnsupportedLinkAction = errors.New("unsupported rmeasurement link process action")
|
||||
|
|
|
|||
|
|
@ -38,3 +38,10 @@ const (
|
|||
// RedisSpecCompTagMeasSetKey define redis set key which store all measurement keys under specific component tag
|
||||
RedisSpecCompTagMeasSetKey = "%s_measurement_keys"
|
||||
)
|
||||
|
||||
const (
|
||||
// SearchLinkAddAction define search link add action
|
||||
SearchLinkAddAction = "add"
|
||||
// SearchLinkDelAction define search link del action
|
||||
SearchLinkDelAction = "del"
|
||||
)
|
||||
|
|
@ -14,6 +14,7 @@ import (
|
|||
// RedisSet defines the encapsulation struct of redis hash type
|
||||
type RedisSet struct {
|
||||
ctx context.Context
|
||||
key string
|
||||
rwLocker *locker.RedissionRWLocker
|
||||
storageClient *redis.Client
|
||||
logger *zap.Logger
|
||||
|
|
@ -24,6 +25,7 @@ func NewRedisSet(ctx context.Context, setKey string, lockLeaseTime uint64, needR
|
|||
token := ctx.Value("client_token").(string)
|
||||
return &RedisSet{
|
||||
ctx: ctx,
|
||||
key: setKey,
|
||||
rwLocker: locker.InitRWLocker(setKey, token, lockLeaseTime, needRefresh),
|
||||
storageClient: GetRedisClientInstance(),
|
||||
logger: logger.GetLoggerInstance(),
|
||||
|
|
@ -31,34 +33,34 @@ func NewRedisSet(ctx context.Context, setKey string, lockLeaseTime uint64, needR
|
|||
}
|
||||
|
||||
// SADD define func of add redis set by members
|
||||
func (rs *RedisSet) SADD(setKey string, members ...interface{}) error {
|
||||
func (rs *RedisSet) SADD(members ...any) error {
|
||||
err := rs.rwLocker.WLock(rs.ctx)
|
||||
if err != nil {
|
||||
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", setKey, "error", err)
|
||||
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", rs.key, "error", err)
|
||||
return err
|
||||
}
|
||||
defer rs.rwLocker.UnWLock(rs.ctx)
|
||||
|
||||
err = rs.storageClient.SAdd(rs.ctx, setKey, members).Err()
|
||||
err = rs.storageClient.SAdd(rs.ctx, rs.key, members).Err()
|
||||
if err != nil {
|
||||
logger.Error(rs.ctx, "add set by memebers failed", "set_key", setKey, "members", members, "error", err)
|
||||
logger.Error(rs.ctx, "add set by memebers failed", "set_key", rs.key, "members", members, "error", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SREM define func of remove the specified members from redis set by key
|
||||
func (rs *RedisSet) SREM(setKey string, members ...interface{}) error {
|
||||
func (rs *RedisSet) SREM(members ...any) error {
|
||||
err := rs.rwLocker.WLock(rs.ctx)
|
||||
if err != nil {
|
||||
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", setKey, "error", err)
|
||||
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", rs.key, "error", err)
|
||||
return err
|
||||
}
|
||||
defer rs.rwLocker.UnWLock(rs.ctx)
|
||||
|
||||
count, err := rs.storageClient.SRem(rs.ctx, setKey, members).Result()
|
||||
count, err := rs.storageClient.SRem(rs.ctx, rs.key, members).Result()
|
||||
if err != nil || count != int64(len(members)) {
|
||||
logger.Error(rs.ctx, "rem members from set failed", "set_key", setKey, "members", members, "error", err)
|
||||
logger.Error(rs.ctx, "rem members from set failed", "set_key", rs.key, "members", members, "error", err)
|
||||
|
||||
return fmt.Errorf("rem members from set failed:%w", err)
|
||||
}
|
||||
|
|
@ -66,27 +68,27 @@ func (rs *RedisSet) SREM(setKey string, members ...interface{}) error {
|
|||
}
|
||||
|
||||
// SMembers define func of get all memebers from redis set by key
|
||||
func (rs *RedisSet) SMembers(setKey string) ([]string, error) {
|
||||
func (rs *RedisSet) SMembers() ([]string, error) {
|
||||
err := rs.rwLocker.RLock(rs.ctx)
|
||||
if err != nil {
|
||||
logger.Error(rs.ctx, "lock rLock by setKey failed", "set_key", setKey, "error", err)
|
||||
logger.Error(rs.ctx, "lock rLock by setKey failed", "set_key", rs.key, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
defer rs.rwLocker.UnRLock(rs.ctx)
|
||||
|
||||
result, err := rs.storageClient.SMembers(rs.ctx, setKey).Result()
|
||||
result, err := rs.storageClient.SMembers(rs.ctx, rs.key).Result()
|
||||
if err != nil {
|
||||
logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", setKey, "error", err)
|
||||
logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", rs.key, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SIsMember define func of determine whether an member is in set by key
|
||||
func (rs *RedisSet) SIsMember(setKey string, member interface{}) (bool, error) {
|
||||
result, err := rs.storageClient.SIsMember(rs.ctx, setKey, member).Result()
|
||||
func (rs *RedisSet) SIsMember(member any) (bool, error) {
|
||||
result, err := rs.storageClient.SIsMember(rs.ctx, rs.key, member).Result()
|
||||
if err != nil {
|
||||
logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", setKey, "error", err)
|
||||
logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", rs.key, "error", err)
|
||||
return false, err
|
||||
}
|
||||
return result, nil
|
||||
|
|
|
|||
|
|
@ -20,7 +20,8 @@ type RedisZSet struct {
|
|||
}
|
||||
|
||||
// NewRedisZSet define func of new redis zset instance
|
||||
func NewRedisZSet(ctx context.Context, key string, token string, lockLeaseTime uint64, needRefresh bool) *RedisZSet {
|
||||
func NewRedisZSet(ctx context.Context, key string, lockLeaseTime uint64, needRefresh bool) *RedisZSet {
|
||||
token := ctx.Value("client_token").(string)
|
||||
return &RedisZSet{
|
||||
ctx: ctx,
|
||||
rwLocker: locker.InitRWLocker(key, token, lockLeaseTime, needRefresh),
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ func MeasurementGetHandler(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
zset := diagram.NewRedisZSet(c, request.MeasurementToken, clientToken, 0, false)
|
||||
zset := diagram.NewRedisZSet(c, request.MeasurementToken, 0, false)
|
||||
points, err := zset.ZRANGE(request.MeasurementToken, 0, -1)
|
||||
if err != nil {
|
||||
logger.Error(c, "failed to get measurement data from redis", "measurement_token", request.MeasurementToken, "error", err)
|
||||
|
|
|
|||
|
|
@ -1,65 +0,0 @@
|
|||
// Package handler provides HTTP handlers for various endpoints.
|
||||
package handler
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"modelRT/constants"
|
||||
"modelRT/database"
|
||||
"modelRT/logger"
|
||||
"modelRT/network"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// MeasurementLinkCreateHandler defines the measurement link creation api
|
||||
func MeasurementLinkCreateHandler(c *gin.Context) {
|
||||
var request network.MeasurementCreateRequest
|
||||
|
||||
clientToken := c.GetString("client_token")
|
||||
if clientToken == "" {
|
||||
err := constants.ErrGetClientToken
|
||||
logger.Error(c, "failed to get client token from context", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.ShouldBindJSON(&request); err != nil {
|
||||
logger.Error(c, "failed to unmarshal measurement create request", "error", err)
|
||||
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: "Invalid request body format: " + err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
pgClient := database.GetPostgresDBClient()
|
||||
var createInfo network.MeasurementCreateInfo
|
||||
newMeasurementID, err := database.CreateMeasurement(c, pgClient, createInfo)
|
||||
if err != nil {
|
||||
logger.Error(c, "failed to insert new measurement into postgres", "data", request.MeasurementData, "error", err)
|
||||
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusInternalServerError,
|
||||
Msg: "Failed to create measurement record: " + err.Error(),
|
||||
Payload: map[string]any{
|
||||
"data_attempted": request.MeasurementData,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info(c, "successfully created new measurement record", "measurement_id", newMeasurementID)
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "Measurement created successfully",
|
||||
Payload: map[string]any{
|
||||
"measurement_id": newMeasurementID,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,136 @@
|
|||
// Package handler provides HTTP handlers for various endpoints.
|
||||
package handler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"modelRT/constants"
|
||||
"modelRT/database"
|
||||
"modelRT/diagram"
|
||||
"modelRT/logger"
|
||||
"modelRT/network"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// MeasurementLinkHandler defines the measurement link process api
|
||||
func MeasurementLinkHandler(c *gin.Context) {
|
||||
var request network.MeasurementLinkRequest
|
||||
|
||||
clientToken := c.GetString("client_token")
|
||||
if clientToken == "" {
|
||||
err := constants.ErrGetClientToken
|
||||
logger.Error(c, "failed to get client token from context", "error", err)
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.ShouldBindJSON(&request); err != nil {
|
||||
logger.Error(c, "failed to unmarshal measurement create request", "error", err)
|
||||
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: "Invalid request body format: " + err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
pgClient := database.GetPostgresDBClient()
|
||||
measurementID := request.MeasurementID
|
||||
action := request.Action
|
||||
measurementInfo, err := database.QueryMeasurementByID(c, pgClient, measurementID)
|
||||
if err != nil {
|
||||
logger.Error(c, "failed to query measurement info by measurement id from postgres", "meauserement_id", measurementID, "error", err)
|
||||
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusInternalServerError,
|
||||
Msg: "failed to query measurement info record: " + err.Error(),
|
||||
Payload: map[string]any{
|
||||
"id": measurementID,
|
||||
"action": action,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
componentInfo, err := database.QueryComponentByUUID(c, pgClient, measurementInfo.ComponentUUID)
|
||||
if err != nil {
|
||||
logger.Error(c, "failed to query component info by component uuid from postgres", "component_uuid", measurementInfo.ComponentUUID, "error", err)
|
||||
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusInternalServerError,
|
||||
Msg: "failed to query component info record: " + err.Error(),
|
||||
Payload: map[string]any{
|
||||
"id": measurementID,
|
||||
"action": action,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
allMeasSet := diagram.NewRedisSet(c, constants.RedisAllMeasTagSetKey, 0, false)
|
||||
compMeasLinkKey := fmt.Sprintf(constants.RedisSpecCompTagMeasSetKey, componentInfo.Tag)
|
||||
compMeasLinkSet := diagram.NewRedisSet(c, compMeasLinkKey, 0, false)
|
||||
|
||||
switch action {
|
||||
case constants.SearchLinkAddAction:
|
||||
err1 := allMeasSet.SADD(measurementInfo.Tag)
|
||||
err2 := compMeasLinkSet.SADD(measurementInfo.Tag)
|
||||
err = processActionError(err1, err2, action)
|
||||
if err != nil {
|
||||
logger.Error(c, "add measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
|
||||
}
|
||||
case constants.SearchLinkDelAction:
|
||||
err1 := allMeasSet.SREM(measurementInfo.Tag)
|
||||
err2 := compMeasLinkSet.SREM(measurementInfo.Tag)
|
||||
err = processActionError(err1, err2, action)
|
||||
if err != nil {
|
||||
logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
|
||||
}
|
||||
default:
|
||||
err = constants.ErrUnsupportedLinkAction
|
||||
logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.JSON(http.StatusOK, network.FailureResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
Msg: err.Error(),
|
||||
Payload: map[string]any{
|
||||
"measurement_id": request.MeasurementID,
|
||||
"action": request.Action,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info(c, "process measurement link success", "measurement_id", measurementID, "action", request.Action)
|
||||
|
||||
c.JSON(http.StatusOK, network.SuccessResponse{
|
||||
Code: http.StatusOK,
|
||||
Msg: "measurement link process success",
|
||||
Payload: map[string]any{
|
||||
"measurement_id": measurementID,
|
||||
"action": request.Action,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func processActionError(err1, err2 error, action string) error {
|
||||
var err error
|
||||
if err1 != nil && err2 != nil {
|
||||
err = errors.Join(err1, err2)
|
||||
err = fmt.Errorf("process measurement link failed, allMeasSet %s operation and compMeasLinkSet %s operation failed: %w", action, action, err)
|
||||
} else if err1 != nil {
|
||||
err = fmt.Errorf("process measurement link failed: allMeasSet %s operation failed: %w", action, err1)
|
||||
} else {
|
||||
err = fmt.Errorf("process measurement link failed: compMeasLinkSet %s operation: %w", action, err2)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
@ -216,7 +216,7 @@ func RealTimeSubHandler(c *gin.Context) {
|
|||
})
|
||||
return
|
||||
default:
|
||||
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedAction, request.Action)
|
||||
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action)
|
||||
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
||||
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
||||
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err)
|
||||
|
|
|
|||
|
|
@ -7,9 +7,13 @@ type MeasurementGetRequest struct {
|
|||
MeasurementToken string `json:"token" example:"some-token"`
|
||||
}
|
||||
|
||||
// MeasurementCreateRequest defines the request payload for create an measurement
|
||||
type MeasurementCreateRequest struct {
|
||||
MeasurementData map[string]any `json:"measurement_data" example:""`
|
||||
// MeasurementLinkRequest defines the request payload for process an measurement link
|
||||
type MeasurementLinkRequest struct {
|
||||
// required: true
|
||||
MeasurementID int64 `json:"measurement_id" example:"1001"`
|
||||
// required: true
|
||||
// enum: [add, del]
|
||||
Action string `json:"action" example:"add"`
|
||||
}
|
||||
|
||||
// MeasurementRecommendRequest defines the request payload for an measurement recommend
|
||||
|
|
|
|||
Loading…
Reference in New Issue