From f48807e5e512ef44c627600838763d11206e818a Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 8 Dec 2025 17:01:24 +0800 Subject: [PATCH] optimize measurement link api --- constants/error.go | 7 +- constants/{keys.go => search_keys.go} | 7 ++ diagram/redis_set.go | 32 +++--- diagram/redis_zset.go | 3 +- handler/measurement_load.go | 2 +- handler/mesurement_create.go | 65 ------------ handler/mesurement_link.go | 136 +++++++++++++++++++++++++ handler/real_time_data_subscription.go | 2 +- network/measurement_request.go | 10 +- 9 files changed, 176 insertions(+), 88 deletions(-) rename constants/{keys.go => search_keys.go} (90%) delete mode 100644 handler/mesurement_create.go create mode 100644 handler/mesurement_link.go diff --git a/constants/error.go b/constants/error.go index 6c7f60a..da0b91e 100644 --- a/constants/error.go +++ b/constants/error.go @@ -50,5 +50,8 @@ var ErrChanIsNil = errors.New("this channel is nil") // ErrConcurrentModify define error of concurrent modification detected var ErrConcurrentModify = errors.New("existed concurrent modification risk") -// ErrUnsupportedAction define error of unsupported real time data monitor action -var ErrUnsupportedAction = errors.New("unsupported real time data monitor action") +// ErrUnsupportedSubAction define error of unsupported real time data subscription action +var ErrUnsupportedSubAction = errors.New("unsupported real time data subscription action") + +// ErrUnsupportedLinkAction define error of unsupported measurement link process action +var ErrUnsupportedLinkAction = errors.New("unsupported rmeasurement link process action") diff --git a/constants/keys.go b/constants/search_keys.go similarity index 90% rename from constants/keys.go rename to constants/search_keys.go index a7ca94c..7bd0ab0 100644 --- a/constants/keys.go +++ b/constants/search_keys.go @@ -38,3 +38,10 @@ const ( // RedisSpecCompTagMeasSetKey define redis set key which store all measurement keys under specific component tag RedisSpecCompTagMeasSetKey = "%s_measurement_keys" ) + +const ( + // SearchLinkAddAction define search link add action + SearchLinkAddAction = "add" + // SearchLinkDelAction define search link del action + SearchLinkDelAction = "del" +) diff --git a/diagram/redis_set.go b/diagram/redis_set.go index 3cabdc7..bfb9f6c 100644 --- a/diagram/redis_set.go +++ b/diagram/redis_set.go @@ -14,6 +14,7 @@ import ( // RedisSet defines the encapsulation struct of redis hash type type RedisSet struct { ctx context.Context + key string rwLocker *locker.RedissionRWLocker storageClient *redis.Client logger *zap.Logger @@ -24,6 +25,7 @@ func NewRedisSet(ctx context.Context, setKey string, lockLeaseTime uint64, needR token := ctx.Value("client_token").(string) return &RedisSet{ ctx: ctx, + key: setKey, rwLocker: locker.InitRWLocker(setKey, token, lockLeaseTime, needRefresh), storageClient: GetRedisClientInstance(), logger: logger.GetLoggerInstance(), @@ -31,34 +33,34 @@ func NewRedisSet(ctx context.Context, setKey string, lockLeaseTime uint64, needR } // SADD define func of add redis set by members -func (rs *RedisSet) SADD(setKey string, members ...interface{}) error { +func (rs *RedisSet) SADD(members ...any) error { err := rs.rwLocker.WLock(rs.ctx) if err != nil { - logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", setKey, "error", err) + logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", rs.key, "error", err) return err } defer rs.rwLocker.UnWLock(rs.ctx) - err = rs.storageClient.SAdd(rs.ctx, setKey, members).Err() + err = rs.storageClient.SAdd(rs.ctx, rs.key, members).Err() if err != nil { - logger.Error(rs.ctx, "add set by memebers failed", "set_key", setKey, "members", members, "error", err) + logger.Error(rs.ctx, "add set by memebers failed", "set_key", rs.key, "members", members, "error", err) return err } return nil } // SREM define func of remove the specified members from redis set by key -func (rs *RedisSet) SREM(setKey string, members ...interface{}) error { +func (rs *RedisSet) SREM(members ...any) error { err := rs.rwLocker.WLock(rs.ctx) if err != nil { - logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", setKey, "error", err) + logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", rs.key, "error", err) return err } defer rs.rwLocker.UnWLock(rs.ctx) - count, err := rs.storageClient.SRem(rs.ctx, setKey, members).Result() + count, err := rs.storageClient.SRem(rs.ctx, rs.key, members).Result() if err != nil || count != int64(len(members)) { - logger.Error(rs.ctx, "rem members from set failed", "set_key", setKey, "members", members, "error", err) + logger.Error(rs.ctx, "rem members from set failed", "set_key", rs.key, "members", members, "error", err) return fmt.Errorf("rem members from set failed:%w", err) } @@ -66,27 +68,27 @@ func (rs *RedisSet) SREM(setKey string, members ...interface{}) error { } // SMembers define func of get all memebers from redis set by key -func (rs *RedisSet) SMembers(setKey string) ([]string, error) { +func (rs *RedisSet) SMembers() ([]string, error) { err := rs.rwLocker.RLock(rs.ctx) if err != nil { - logger.Error(rs.ctx, "lock rLock by setKey failed", "set_key", setKey, "error", err) + logger.Error(rs.ctx, "lock rLock by setKey failed", "set_key", rs.key, "error", err) return nil, err } defer rs.rwLocker.UnRLock(rs.ctx) - result, err := rs.storageClient.SMembers(rs.ctx, setKey).Result() + result, err := rs.storageClient.SMembers(rs.ctx, rs.key).Result() if err != nil { - logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", setKey, "error", err) + logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", rs.key, "error", err) return nil, err } return result, nil } // SIsMember define func of determine whether an member is in set by key -func (rs *RedisSet) SIsMember(setKey string, member interface{}) (bool, error) { - result, err := rs.storageClient.SIsMember(rs.ctx, setKey, member).Result() +func (rs *RedisSet) SIsMember(member any) (bool, error) { + result, err := rs.storageClient.SIsMember(rs.ctx, rs.key, member).Result() if err != nil { - logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", setKey, "error", err) + logger.Error(rs.ctx, "get all set field by hash key failed", "set_key", rs.key, "error", err) return false, err } return result, nil diff --git a/diagram/redis_zset.go b/diagram/redis_zset.go index 75dfa6e..549d28b 100644 --- a/diagram/redis_zset.go +++ b/diagram/redis_zset.go @@ -20,7 +20,8 @@ type RedisZSet struct { } // NewRedisZSet define func of new redis zset instance -func NewRedisZSet(ctx context.Context, key string, token string, lockLeaseTime uint64, needRefresh bool) *RedisZSet { +func NewRedisZSet(ctx context.Context, key string, lockLeaseTime uint64, needRefresh bool) *RedisZSet { + token := ctx.Value("client_token").(string) return &RedisZSet{ ctx: ctx, rwLocker: locker.InitRWLocker(key, token, lockLeaseTime, needRefresh), diff --git a/handler/measurement_load.go b/handler/measurement_load.go index 134a3d9..065b9f5 100644 --- a/handler/measurement_load.go +++ b/handler/measurement_load.go @@ -38,7 +38,7 @@ func MeasurementGetHandler(c *gin.Context) { return } - zset := diagram.NewRedisZSet(c, request.MeasurementToken, clientToken, 0, false) + zset := diagram.NewRedisZSet(c, request.MeasurementToken, 0, false) points, err := zset.ZRANGE(request.MeasurementToken, 0, -1) if err != nil { logger.Error(c, "failed to get measurement data from redis", "measurement_token", request.MeasurementToken, "error", err) diff --git a/handler/mesurement_create.go b/handler/mesurement_create.go deleted file mode 100644 index ecd07c6..0000000 --- a/handler/mesurement_create.go +++ /dev/null @@ -1,65 +0,0 @@ -// Package handler provides HTTP handlers for various endpoints. -package handler - -import ( - "net/http" - - "modelRT/constants" - "modelRT/database" - "modelRT/logger" - "modelRT/network" - - "github.com/gin-gonic/gin" -) - -// MeasurementLinkCreateHandler defines the measurement link creation api -func MeasurementLinkCreateHandler(c *gin.Context) { - var request network.MeasurementCreateRequest - - clientToken := c.GetString("client_token") - if clientToken == "" { - err := constants.ErrGetClientToken - logger.Error(c, "failed to get client token from context", "error", err) - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: err.Error(), - }) - return - } - - if err := c.ShouldBindJSON(&request); err != nil { - logger.Error(c, "failed to unmarshal measurement create request", "error", err) - - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: "Invalid request body format: " + err.Error(), - }) - return - } - - pgClient := database.GetPostgresDBClient() - var createInfo network.MeasurementCreateInfo - newMeasurementID, err := database.CreateMeasurement(c, pgClient, createInfo) - if err != nil { - logger.Error(c, "failed to insert new measurement into postgres", "data", request.MeasurementData, "error", err) - - c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusInternalServerError, - Msg: "Failed to create measurement record: " + err.Error(), - Payload: map[string]any{ - "data_attempted": request.MeasurementData, - }, - }) - return - } - - logger.Info(c, "successfully created new measurement record", "measurement_id", newMeasurementID) - - c.JSON(http.StatusOK, network.SuccessResponse{ - Code: http.StatusOK, - Msg: "Measurement created successfully", - Payload: map[string]any{ - "measurement_id": newMeasurementID, - }, - }) -} diff --git a/handler/mesurement_link.go b/handler/mesurement_link.go new file mode 100644 index 0000000..664e6a1 --- /dev/null +++ b/handler/mesurement_link.go @@ -0,0 +1,136 @@ +// Package handler provides HTTP handlers for various endpoints. +package handler + +import ( + "errors" + "fmt" + "net/http" + + "modelRT/constants" + "modelRT/database" + "modelRT/diagram" + "modelRT/logger" + "modelRT/network" + + "github.com/gin-gonic/gin" +) + +// MeasurementLinkHandler defines the measurement link process api +func MeasurementLinkHandler(c *gin.Context) { + var request network.MeasurementLinkRequest + + clientToken := c.GetString("client_token") + if clientToken == "" { + err := constants.ErrGetClientToken + logger.Error(c, "failed to get client token from context", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + }) + return + } + + if err := c.ShouldBindJSON(&request); err != nil { + logger.Error(c, "failed to unmarshal measurement create request", "error", err) + + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: "Invalid request body format: " + err.Error(), + }) + return + } + + var err error + pgClient := database.GetPostgresDBClient() + measurementID := request.MeasurementID + action := request.Action + measurementInfo, err := database.QueryMeasurementByID(c, pgClient, measurementID) + if err != nil { + logger.Error(c, "failed to query measurement info by measurement id from postgres", "meauserement_id", measurementID, "error", err) + + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusInternalServerError, + Msg: "failed to query measurement info record: " + err.Error(), + Payload: map[string]any{ + "id": measurementID, + "action": action, + }, + }) + return + } + + componentInfo, err := database.QueryComponentByUUID(c, pgClient, measurementInfo.ComponentUUID) + if err != nil { + logger.Error(c, "failed to query component info by component uuid from postgres", "component_uuid", measurementInfo.ComponentUUID, "error", err) + + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusInternalServerError, + Msg: "failed to query component info record: " + err.Error(), + Payload: map[string]any{ + "id": measurementID, + "action": action, + }, + }) + return + } + + allMeasSet := diagram.NewRedisSet(c, constants.RedisAllMeasTagSetKey, 0, false) + compMeasLinkKey := fmt.Sprintf(constants.RedisSpecCompTagMeasSetKey, componentInfo.Tag) + compMeasLinkSet := diagram.NewRedisSet(c, compMeasLinkKey, 0, false) + + switch action { + case constants.SearchLinkAddAction: + err1 := allMeasSet.SADD(measurementInfo.Tag) + err2 := compMeasLinkSet.SADD(measurementInfo.Tag) + err = processActionError(err1, err2, action) + if err != nil { + logger.Error(c, "add measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err) + } + case constants.SearchLinkDelAction: + err1 := allMeasSet.SREM(measurementInfo.Tag) + err2 := compMeasLinkSet.SREM(measurementInfo.Tag) + err = processActionError(err1, err2, action) + if err != nil { + logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err) + } + default: + err = constants.ErrUnsupportedLinkAction + logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err) + } + + if err != nil { + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + Payload: map[string]any{ + "measurement_id": request.MeasurementID, + "action": request.Action, + }, + }) + return + } + + logger.Info(c, "process measurement link success", "measurement_id", measurementID, "action", request.Action) + + c.JSON(http.StatusOK, network.SuccessResponse{ + Code: http.StatusOK, + Msg: "measurement link process success", + Payload: map[string]any{ + "measurement_id": measurementID, + "action": request.Action, + }, + }) +} + +func processActionError(err1, err2 error, action string) error { + var err error + if err1 != nil && err2 != nil { + err = errors.Join(err1, err2) + err = fmt.Errorf("process measurement link failed, allMeasSet %s operation and compMeasLinkSet %s operation failed: %w", action, action, err) + } else if err1 != nil { + err = fmt.Errorf("process measurement link failed: allMeasSet %s operation failed: %w", action, err1) + } else { + err = fmt.Errorf("process measurement link failed: compMeasLinkSet %s operation: %w", action, err2) + } + return err +} diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index 536f3c8..7faa3cd 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -216,7 +216,7 @@ func RealTimeSubHandler(c *gin.Context) { }) return default: - err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedAction, request.Action) + err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action) logger.Error(c, "unsupported action of real time data subscription request", "error", err) requestTargetsCount := processRealTimeRequestCount(request.Measurements) results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, err) diff --git a/network/measurement_request.go b/network/measurement_request.go index 6ed99bc..3c0a003 100644 --- a/network/measurement_request.go +++ b/network/measurement_request.go @@ -7,9 +7,13 @@ type MeasurementGetRequest struct { MeasurementToken string `json:"token" example:"some-token"` } -// MeasurementCreateRequest defines the request payload for create an measurement -type MeasurementCreateRequest struct { - MeasurementData map[string]any `json:"measurement_data" example:""` +// MeasurementLinkRequest defines the request payload for process an measurement link +type MeasurementLinkRequest struct { + // required: true + MeasurementID int64 `json:"measurement_id" example:"1001"` + // required: true + // enum: [add, del] + Action string `json:"action" example:"add"` } // MeasurementRecommendRequest defines the request payload for an measurement recommend