diff --git a/database/query_component.go b/database/query_component.go index dca464a..8565cbd 100644 --- a/database/query_component.go +++ b/database/query_component.go @@ -43,8 +43,11 @@ func QueryComponentByUUID(ctx context.Context, tx *gorm.DB, uuid uuid.UUID) (orm // ctx超时判断 cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() + result := tx.WithContext(cancelCtx). + Where("global_uuid = ?", uuid). + Clauses(clause.Locking{Strength: "UPDATE"}). + First(&component) - result := tx.WithContext(cancelCtx).Where("global_uuid = ? ", uuid).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&component) if result.Error != nil { return orm.Component{}, result.Error } diff --git a/database/query_measurement.go b/database/query_measurement.go new file mode 100644 index 0000000..5b671de --- /dev/null +++ b/database/query_measurement.go @@ -0,0 +1,29 @@ +// Package database define database operation functions +package database + +import ( + "context" + "time" + + "modelRT/orm" + + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// QueryMeasurementByID return the result of query circuit diagram component measurement info by id from postgresDB +func QueryMeasurementByID(ctx context.Context, tx *gorm.DB, id int64) (orm.Measurement, error) { + var component orm.Measurement + // ctx超时判断 + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + result := tx.WithContext(cancelCtx). + Where("id = ?", id). + Clauses(clause.Locking{Strength: "UPDATE"}). + First(&component) + + if result.Error != nil { + return orm.Measurement{}, result.Error + } + return component, nil +} diff --git a/diagram/redis_zset.go b/diagram/redis_zset.go index 30dcd7a..75dfa6e 100644 --- a/diagram/redis_zset.go +++ b/diagram/redis_zset.go @@ -3,6 +3,8 @@ package diagram import ( "context" + "iter" + "maps" locker "modelRT/distributedlock" "modelRT/logger" @@ -18,8 +20,8 @@ type RedisZSet struct { } // NewRedisZSet define func of new redis zset instance -func NewRedisZSet(ctx context.Context, key string, token string, lockLeaseTime uint64, needRefresh bool) *RedisHash { - return &RedisHash{ +func NewRedisZSet(ctx context.Context, key string, token string, lockLeaseTime uint64, needRefresh bool) *RedisZSet { + return &RedisZSet{ ctx: ctx, rwLocker: locker.InitRWLocker(key, token, lockLeaseTime, needRefresh), storageClient: GetRedisClientInstance(), @@ -42,3 +44,80 @@ func (rs *RedisZSet) ZADD(setKey string, score float64, member interface{}) erro } return nil } + +// ZRANGE define func of returns the specified range of elements in the sorted set stored by key +func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error) { + var results []string + + err := rs.rwLocker.RLock(rs.ctx) + if err != nil { + logger.Error(rs.ctx, "lock RLock by setKey failed", "set_key", setKey, "error", err) + return nil, err + } + + defer func() { + err = rs.rwLocker.UnRLock(rs.ctx) + if err != nil { + logger.Error(rs.ctx, "unlock RLock by setKey failed", "set_key", setKey, "error", err) + } + }() + + results, err = rs.storageClient.ZRange(rs.ctx, setKey, start, stop).Result() + if err != nil { + logger.Error(rs.ctx, "range set by key failed", "set_key", setKey, "start", start, "stop", stop, "error", err) + return nil, err + } + return results, nil +} + +type Comparer[T any] interface { + Compare(T) int +} + +type ComparableComparer[T any] interface { + Compare(T) int + comparable // 直接嵌入 comparable 约束 +} + +type methodNode[E Comparer[E]] struct { + value E + left *methodNode[E] + right *methodNode[E] +} + +type MethodTree[E Comparer[E]] struct { + root *methodNode[E] +} + +type OrderedSet[E interface { + comparable + Comparer[E] +}] struct { + tree MethodTree[E] + elements map[E]bool +} + +type ComparableOrderedSet[E ComparableComparer[E]] struct { + tree MethodTree[E] + elements map[E]bool +} + +type Set[E any] interface { + Insert(E) + Delete(E) + Has(E) bool + All() iter.Seq[E] +} + +func InsertAll[E any](set Set[E], seq iter.Seq[E]) { + for v := range seq { + set.Insert(v) + } +} + +type HashSet[E comparable] map[E]bool + +func (s HashSet[E]) Insert(v E) { s[v] = true } +func (s HashSet[E]) Delete(v E) { delete(s, v) } +func (s HashSet[E]) Has(v E) bool { return s[v] } +func (s HashSet[E]) All() iter.Seq[E] { return maps.Keys(s) } diff --git a/handler/measurement_load.go b/handler/measurement_load.go index 1651999..34c033d 100644 --- a/handler/measurement_load.go +++ b/handler/measurement_load.go @@ -4,6 +4,7 @@ import ( "net/http" "modelRT/database" + "modelRT/diagram" "modelRT/logger" "modelRT/network" @@ -23,33 +24,46 @@ func MeasurementGetHandler(c *gin.Context) { return } - // TODO 增加 redis 数据读取步骤 - pgClient := database.GetPostgresDBClient() - tx := pgClient.Begin() - - attrModel, err := database.ParseAttrToken(c, tx, request.MeasurementToken) + // token 当前客户端的唯一标识(token),用于区分不同的客户端。 + zset := diagram.NewRedisZSet(c, request.MeasurementToken, "token", 10, false) + points, err := zset.ZRANGE(request.MeasurementToken, 0, -1) if err != nil { - tx.Rollback() - logger.Error(c, "Failed to parse attribute token", "attr_token", request.MeasurementToken, "error", err) + logger.Error(c, "failed to get measurement data from redis", "measurement_token", request.MeasurementToken, "error", err) c.JSON(http.StatusOK, network.FailureResponse{ - Code: http.StatusBadRequest, - Msg: err.Error(), - PayLoad: map[string]interface{}{"attr_token": request.MeasurementToken}, + Code: http.StatusInternalServerError, + Msg: err.Error(), + PayLoad: map[string]interface{}{ + "measurement_id": request.MeasurementID, + "measurement_token": request.MeasurementToken, + }, }) return } - tx.Commit() - // The GetAttrValue method is assumed to exist on the AttrModelInterface. - // You need to add this method to your attribute_model.go interface definition. - attrValue := attrModel.GetAttrValue() + pgClient := database.GetPostgresDBClient() + measurementInfo, err := database.QueryMeasurementByID(c, pgClient, request.MeasurementID) + if err != nil { + logger.Error(c, "failed to query measurement by id", "measurement_id", request.MeasurementID, "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + PayLoad: map[string]interface{}{ + "measurement_id": request.MeasurementID, + "measurement_token": request.MeasurementToken, + "measurement_value": points, + }, + }) + return + } c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", PayLoad: map[string]interface{}{ - "attr_token": request.MeasurementToken, - "attr_value": attrValue, + "measurement_id": request.MeasurementID, + "measurement_token": request.MeasurementToken, + "measurement_info": measurementInfo, + "measurement_value": points, }, }) }