add handler of compoent attribute query api
This commit is contained in:
parent
d1495b7ab8
commit
cceffa8219
|
|
@ -10,5 +10,6 @@ var (
|
|||
ErrDBQueryFailed = newError(50001, "query postgres database data failed")
|
||||
ErrDBUpdateFailed = newError(50002, "update postgres database data failed")
|
||||
ErrDBzeroAffectedRows = newError(50002, "zero affected rows")
|
||||
ErrCachedQueryFailed = newError(50003, "query redis cached data failed")
|
||||
ErrCacheSyncWarn = newError(60002, "postgres database updated, but cache sync failed")
|
||||
)
|
||||
|
|
|
|||
|
|
@ -58,10 +58,7 @@ func QueryComponentByUUID(ctx context.Context, tx *gorm.DB, uuid uuid.UUID) (orm
|
|||
// QueryComponentByCompTag return the result of query circuit diagram component info by component tag from postgresDB
|
||||
func QueryComponentByCompTag(ctx context.Context, tx *gorm.DB, tag string) (orm.Component, error) {
|
||||
var component orm.Component
|
||||
// ctx超时判断
|
||||
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
result := tx.WithContext(cancelCtx).
|
||||
result := tx.WithContext(ctx).
|
||||
Where("tag = ?", tag).
|
||||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||||
First(&component)
|
||||
|
|
@ -72,6 +69,29 @@ func QueryComponentByCompTag(ctx context.Context, tx *gorm.DB, tag string) (orm.
|
|||
return component, nil
|
||||
}
|
||||
|
||||
// QueryComponentByCompTags return the result of query circuit diagram component info by components tag from postgresDB
|
||||
func QueryComponentByCompTags(ctx context.Context, tx *gorm.DB, tags []string) (map[string]orm.Component, error) {
|
||||
if len(tags) == 0 {
|
||||
return make(map[string]orm.Component), nil
|
||||
}
|
||||
|
||||
var results []orm.Component
|
||||
err := tx.WithContext(ctx).
|
||||
Model(orm.Component{}).
|
||||
Select("global_uuid,tag, model_name").
|
||||
Where("tag IN ?", tags).
|
||||
Find(&results).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
compModelMap := make(map[string]orm.Component, len(results))
|
||||
for _, result := range results {
|
||||
compModelMap[result.Tag] = result
|
||||
}
|
||||
return compModelMap, nil
|
||||
}
|
||||
|
||||
// QueryComponentByPageID return the result of query circuit diagram component info by page id from postgresDB
|
||||
func QueryComponentByPageID(ctx context.Context, tx *gorm.DB, uuid uuid.UUID) (orm.Component, error) {
|
||||
var component orm.Component
|
||||
|
|
|
|||
|
|
@ -0,0 +1,207 @@
|
|||
// Package handler provides HTTP handlers for various endpoints.
|
||||
package handler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"maps"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
|
||||
"modelRT/common/errcode"
|
||||
"modelRT/constants"
|
||||
"modelRT/database"
|
||||
"modelRT/diagram"
|
||||
"modelRT/logger"
|
||||
"modelRT/orm"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ComponentAttributeQueryHandler define circuit diagram component attribute value query process API
|
||||
func ComponentAttributeQueryHandler(c *gin.Context) {
|
||||
pgClient := database.GetPostgresDBClient()
|
||||
|
||||
tokens := c.Param("tokens")
|
||||
if tokens == "" {
|
||||
err := fmt.Errorf("tokens is missing from the path")
|
||||
logger.Error(c, "query tokens from path failed", "error", err, "url", c.Request.RequestURI)
|
||||
renderFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
|
||||
tokenSlice := strings.Split(tokens, ",")
|
||||
queryResults := make(map[string]queryResult)
|
||||
// TODO 优化掉 attriQueryConfs 和 attributeComponentTag
|
||||
cacheQueryMap := make(map[string][]cacheQueryItem, len(tokenSlice))
|
||||
|
||||
for _, token := range tokenSlice {
|
||||
slices := strings.Split(token, ".")
|
||||
if len(slices) < 7 {
|
||||
queryResults[token] = queryResult{err: errcode.ErrInvalidToken}
|
||||
continue
|
||||
}
|
||||
|
||||
hSetKey := fmt.Sprintf("%s_%s", slices[4], slices[5])
|
||||
cacheQueryMap[hSetKey] = append(cacheQueryMap[hSetKey],
|
||||
cacheQueryItem{
|
||||
token: token,
|
||||
attributeCompTag: slices[4],
|
||||
attributeExtendType: slices[5],
|
||||
attributeName: slices[6],
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
var secondaryQueryCount int
|
||||
dbQueryMap := make(map[string][]cacheQueryItem)
|
||||
for hSetKey, queryItems := range cacheQueryMap {
|
||||
hset := diagram.NewRedisHash(c, hSetKey, 5000, false)
|
||||
|
||||
cacheItems, err := hset.HGetAll()
|
||||
if err != nil {
|
||||
logger.Error(c, "query redis cached data failed", "hash_key", hSetKey, "error", err)
|
||||
for _, queryItem := range queryItems {
|
||||
if _, exists := queryResults[queryItem.token]; exists {
|
||||
queryResults[queryItem.token] = queryResult{err: errcode.ErrCachedQueryFailed.WithCause(err)}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, queryItem := range queryItems {
|
||||
value, exists := cacheItems[queryItem.attributeName]
|
||||
if !exists {
|
||||
// TODO 增加二次查询流程,从 pg中尝试获取数据
|
||||
secondaryQueryCount++
|
||||
dbQueryMap[queryItem.attributeCompTag] = append(dbQueryMap[queryItem.attributeCompTag], queryItem)
|
||||
continue
|
||||
}
|
||||
queryResults[queryItem.token] = queryResult{value: value}
|
||||
}
|
||||
}
|
||||
|
||||
// open transaction
|
||||
tx := pgClient.WithContext(c).Begin()
|
||||
if tx.Error != nil {
|
||||
logger.Error(c, "begin postgres transaction failed", "error", tx.Error)
|
||||
renderFailure(c, constants.RespCodeServerError, "begin transaction failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
allCompTags := slices.Collect(maps.Keys(dbQueryMap))
|
||||
compModelMap, err := database.QueryComponentByCompTags(c, tx, allCompTags)
|
||||
if err != nil {
|
||||
logger.Error(c, "begin postgres transaction failed", "error", tx.Error)
|
||||
renderFailure(c, constants.RespCodeServerError, "begin transaction failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
identifiers := make([]orm.ProjectIdentifier, secondaryQueryCount)
|
||||
for compTag, queryItems := range dbQueryMap {
|
||||
compInfo, exists := compModelMap[compTag]
|
||||
if !exists {
|
||||
// TODO 根据compTag下queryItems的更新 queryResults
|
||||
fmt.Println(11111)
|
||||
continue
|
||||
}
|
||||
|
||||
for index, queryItem := range queryItems {
|
||||
identifiers = append(identifiers, orm.ProjectIdentifier{
|
||||
Token: queryItem.token,
|
||||
Tag: queryItem.attributeCompTag,
|
||||
MetaModel: compInfo.ModelName,
|
||||
})
|
||||
queryItems[index].attributeModelName = compInfo.ModelName
|
||||
queryItems[index].globalUUID = compInfo.GlobalUUID
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
tableNameMap, err := database.BatchGetProjectNames(tx, identifiers)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
|
||||
for _, id := range identifiers {
|
||||
if _, exists := queryResults[id.Token]; !exists {
|
||||
queryResults[id.Token] = queryResult{err: errcode.ErrRetrieveFailed.WithCause(err)}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
redisUpdateMap := make(map[string][]cacheQueryItem)
|
||||
for compTag, queryItems := range dbQueryMap {
|
||||
fmt.Println(compTag, queryItems)
|
||||
for _, queryItem := range queryItems {
|
||||
fmt.Println(queryItem)
|
||||
id := orm.ProjectIdentifier{Tag: queryItem.attributeModelName, MetaModel: queryItem.attributeExtendType}
|
||||
tableName, exists := tableNameMap[id]
|
||||
if !exists {
|
||||
// TODO 优化先判断token是否存在
|
||||
queryResults[queryItem.token] = queryResult{err: errcode.ErrFoundTargetFailed}
|
||||
continue
|
||||
}
|
||||
|
||||
cacheValue := make(map[string]any)
|
||||
result := tx.Table(tableName).Select(queryItem.attributeName).
|
||||
Where("global_uuid = ?", queryItem.globalUUID).First(cacheValue)
|
||||
|
||||
if result.Error != nil {
|
||||
queryResults[queryItem.token] = queryResult{err: errcode.ErrDBQueryFailed}
|
||||
continue
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
queryResults[queryItem.token] = queryResult{err: errcode.ErrDBzeroAffectedRows}
|
||||
continue
|
||||
}
|
||||
// TODO 更新对应的redis hset中的值
|
||||
attributeValue := cacheValue[queryItem.attributeName].(string)
|
||||
queryResults[queryItem.token] = queryResult{value: attributeValue}
|
||||
queryItem.attributeVal = attributeValue
|
||||
redisUpdateMap[queryItem.token] = append(redisUpdateMap[queryItem.token], queryItem)
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// commit transaction
|
||||
if err := tx.Commit().Error; err != nil {
|
||||
renderFailure(c, constants.RespCodeServerError, "transaction commit failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
for hSetKey, items := range redisUpdateMap {
|
||||
hset := diagram.NewRedisHash(c, hSetKey, 5000, false)
|
||||
fields := make(map[string]any, len(items))
|
||||
for _, item := range items {
|
||||
fields[item.attributeName] = item.attributeVal
|
||||
}
|
||||
|
||||
if err := hset.SetRedisHashByMap(fields); err != nil {
|
||||
logger.Error(c, "batch sync redis failed", "hash_key", hSetKey, "error", err)
|
||||
|
||||
for _, item := range items {
|
||||
if _, exists := queryResults[item.token]; exists {
|
||||
queryResults[item.token] = queryResult{err: errcode.ErrCacheSyncWarn.WithCause(err)}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// payload := generateRespPayload(queryResults, request.AttributeConfigs)
|
||||
// renderRespSuccess(c, constants.RespCodeSuccess, "process completed", payload)
|
||||
}
|
||||
|
||||
type cacheQueryItem struct {
|
||||
globalUUID uuid.UUID
|
||||
token string
|
||||
attributeCompTag string
|
||||
attributeModelName string
|
||||
attributeExtendType string
|
||||
attributeName string
|
||||
attributeVal string
|
||||
}
|
||||
|
||||
type queryResult struct {
|
||||
err *errcode.AppError
|
||||
value string
|
||||
}
|
||||
|
|
@ -46,7 +46,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
|
||||
attriModifyConfs = append(attriModifyConfs, attributeModifyConfig{
|
||||
attributeToken: attribute.AttributeToken,
|
||||
attributeExtendName: slices[5],
|
||||
attributeExtendType: slices[5],
|
||||
attributeName: slices[6],
|
||||
attributeOldVal: attribute.AttributeOldVal,
|
||||
attributeNewVal: attribute.AttributeNewVal,
|
||||
|
|
@ -54,7 +54,13 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
}
|
||||
|
||||
// open transaction
|
||||
tx := pgClient.Begin()
|
||||
tx := pgClient.WithContext(c).Begin()
|
||||
if tx.Error != nil {
|
||||
logger.Error(c, "begin postgres transaction failed", "error", tx.Error)
|
||||
renderFailure(c, constants.RespCodeServerError, "begin postgres transaction failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
compInfo, err := database.QueryComponentByCompTag(c, tx, attributeComponentTag)
|
||||
if err != nil {
|
||||
logger.Error(c, "query component info by component tag failed", "error", err, "tag", attributeComponentTag)
|
||||
|
|
@ -67,7 +73,8 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
|
||||
tx.Rollback()
|
||||
|
||||
renderFailure(c, constants.RespCodeFailed, "query component metadata failed", updateResults)
|
||||
payload := generateRespPayload(updateResults, request.AttributeConfigs)
|
||||
renderFailure(c, constants.RespCodeFailed, "query component metadata failed", payload)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -76,7 +83,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
identifiers[i] = orm.ProjectIdentifier{
|
||||
Token: mod.attributeToken,
|
||||
Tag: compInfo.ModelName,
|
||||
MetaModel: mod.attributeExtendName,
|
||||
MetaModel: mod.attributeExtendType,
|
||||
}
|
||||
}
|
||||
tableNameMap, err := database.BatchGetProjectNames(tx, identifiers)
|
||||
|
|
@ -89,13 +96,14 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
renderFailure(c, constants.RespCodeFailed, "batch retrieve table names failed", updateResults)
|
||||
payload := generateRespPayload(updateResults, request.AttributeConfigs)
|
||||
renderFailure(c, constants.RespCodeFailed, "batch retrieve table names failed", payload)
|
||||
return
|
||||
}
|
||||
|
||||
redisUpdateMap := make(map[string][]cacheItem)
|
||||
redisUpdateMap := make(map[string][]cacheUpdateItem)
|
||||
for _, mod := range attriModifyConfs {
|
||||
id := orm.ProjectIdentifier{Tag: compInfo.ModelName, MetaModel: mod.attributeExtendName}
|
||||
id := orm.ProjectIdentifier{Tag: compInfo.ModelName, MetaModel: mod.attributeExtendType}
|
||||
tableName, exists := tableNameMap[id]
|
||||
if !exists {
|
||||
updateResults[mod.attributeToken] = errcode.ErrFoundTargetFailed
|
||||
|
|
@ -115,9 +123,9 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
continue
|
||||
}
|
||||
|
||||
cacheKey := fmt.Sprintf("%s_%s", attributeComponentTag, mod.attributeExtendName)
|
||||
cacheKey := fmt.Sprintf("%s_%s", attributeComponentTag, mod.attributeExtendType)
|
||||
redisUpdateMap[cacheKey] = append(redisUpdateMap[cacheKey],
|
||||
cacheItem{
|
||||
cacheUpdateItem{
|
||||
token: mod.attributeToken,
|
||||
name: mod.attributeName,
|
||||
newVal: mod.attributeNewVal,
|
||||
|
|
@ -130,13 +138,6 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
for key, items := range redisUpdateMap {
|
||||
hset := diagram.NewRedisHash(c, key, 5000, false)
|
||||
for _, item := range items {
|
||||
_ = hset.SetRedisHashByKV(item.name, item.newVal)
|
||||
}
|
||||
}
|
||||
|
||||
for key, items := range redisUpdateMap {
|
||||
hset := diagram.NewRedisHash(c, key, 5000, false)
|
||||
|
||||
|
|
@ -156,20 +157,54 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO 通过循环最初的 request 填充剩余处理正确token的updateResults结果
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", updateResults)
|
||||
payload := generateRespPayload(updateResults, request.AttributeConfigs)
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", payload)
|
||||
}
|
||||
|
||||
type attributeModifyConfig struct {
|
||||
attributeToken string
|
||||
attributeExtendName string
|
||||
attributeExtendType string
|
||||
attributeName string
|
||||
attributeOldVal string
|
||||
attributeNewVal string
|
||||
}
|
||||
|
||||
type cacheItem struct {
|
||||
type cacheUpdateItem struct {
|
||||
token string
|
||||
name string
|
||||
newVal string
|
||||
}
|
||||
|
||||
type attributeResult struct {
|
||||
Token string `json:"token"`
|
||||
Code int `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
}
|
||||
|
||||
func generateRespPayload(updateResults map[string]*errcode.AppError, originalRequests []network.ComponentAttributeConfig) map[string]any {
|
||||
attributes := make([]attributeResult, 0, len(originalRequests))
|
||||
|
||||
for _, req := range originalRequests {
|
||||
token := req.AttributeToken
|
||||
|
||||
if appErr, exists := updateResults[token]; exists {
|
||||
attributes = append(attributes, attributeResult{
|
||||
Token: token,
|
||||
Code: appErr.Code(),
|
||||
Msg: appErr.Msg(),
|
||||
})
|
||||
} else {
|
||||
attributes = append(attributes, attributeResult{
|
||||
Token: token,
|
||||
Code: constants.CodeSuccess,
|
||||
Msg: "token value update success",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
payload := map[string]any{
|
||||
"attributes": attributes,
|
||||
}
|
||||
|
||||
return payload
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,5 +10,6 @@ import (
|
|||
// registerComponentRoutes define func of register component routes
|
||||
func registerComponentRoutes(rg *gin.RouterGroup) {
|
||||
g := rg.Group("/component/")
|
||||
g.GET("attribute/get/:token", handler.ComponentAttributeQueryHandler)
|
||||
g.POST("attribute/update", handler.ComponentAttributeUpdateHandler)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue