optimize handler of compoent attribute query api
This commit is contained in:
parent
cceffa8219
commit
d75b9a624c
|
|
@ -6,7 +6,7 @@ var (
|
|||
ErrInvalidToken = newError(40001, "invalid token format")
|
||||
ErrCrossToken = newError(40002, "cross-component update not allowed")
|
||||
ErrRetrieveFailed = newError(40003, "retrieve table mapping failed")
|
||||
ErrFoundTargetFailed = newError(40003, "found target table by token failed")
|
||||
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
|
||||
ErrDBQueryFailed = newError(50001, "query postgres database data failed")
|
||||
ErrDBUpdateFailed = newError(50002, "update postgres database data failed")
|
||||
ErrDBzeroAffectedRows = newError(50002, "zero affected rows")
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"maps"
|
||||
"slices"
|
||||
|
|
@ -33,8 +34,7 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
|
||||
tokenSlice := strings.Split(tokens, ",")
|
||||
queryResults := make(map[string]queryResult)
|
||||
// TODO 优化掉 attriQueryConfs 和 attributeComponentTag
|
||||
cacheQueryMap := make(map[string][]cacheQueryItem, len(tokenSlice))
|
||||
cacheQueryMap := make(map[string][]cacheQueryItem)
|
||||
|
||||
for _, token := range tokenSlice {
|
||||
slices := strings.Split(token, ".")
|
||||
|
|
@ -42,153 +42,138 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
queryResults[token] = queryResult{err: errcode.ErrInvalidToken}
|
||||
continue
|
||||
}
|
||||
|
||||
hSetKey := fmt.Sprintf("%s_%s", slices[4], slices[5])
|
||||
cacheQueryMap[hSetKey] = append(cacheQueryMap[hSetKey],
|
||||
cacheQueryItem{
|
||||
token: token,
|
||||
attributeCompTag: slices[4],
|
||||
attributeExtendType: slices[5],
|
||||
attributeName: slices[6],
|
||||
},
|
||||
)
|
||||
cacheQueryMap[hSetKey] = append(cacheQueryMap[hSetKey], cacheQueryItem{
|
||||
token: token,
|
||||
attributeCompTag: slices[4],
|
||||
attributeExtendType: slices[5],
|
||||
attributeName: slices[6],
|
||||
})
|
||||
}
|
||||
|
||||
var secondaryQueryCount int
|
||||
dbQueryMap := make(map[string][]cacheQueryItem)
|
||||
for hSetKey, queryItems := range cacheQueryMap {
|
||||
var secondaryQueryCount int
|
||||
for hSetKey, items := range cacheQueryMap {
|
||||
hset := diagram.NewRedisHash(c, hSetKey, 5000, false)
|
||||
|
||||
cacheItems, err := hset.HGetAll()
|
||||
cacheData, err := hset.HGetAll()
|
||||
if err != nil {
|
||||
logger.Error(c, "query redis cached data failed", "hash_key", hSetKey, "error", err)
|
||||
for _, queryItem := range queryItems {
|
||||
if _, exists := queryResults[queryItem.token]; exists {
|
||||
queryResults[queryItem.token] = queryResult{err: errcode.ErrCachedQueryFailed.WithCause(err)}
|
||||
}
|
||||
}
|
||||
logger.Warn(c, "redis hgetall failed", "key", hSetKey, "err", err)
|
||||
}
|
||||
|
||||
for _, queryItem := range queryItems {
|
||||
value, exists := cacheItems[queryItem.attributeName]
|
||||
if !exists {
|
||||
// TODO 增加二次查询流程,从 pg中尝试获取数据
|
||||
for _, item := range items {
|
||||
if val, ok := cacheData[item.attributeName]; ok {
|
||||
queryResults[item.token] = queryResult{value: val}
|
||||
} else {
|
||||
dbQueryMap[item.attributeCompTag] = append(dbQueryMap[item.attributeCompTag], item)
|
||||
secondaryQueryCount++
|
||||
dbQueryMap[queryItem.attributeCompTag] = append(dbQueryMap[queryItem.attributeCompTag], queryItem)
|
||||
continue
|
||||
}
|
||||
queryResults[queryItem.token] = queryResult{value: value}
|
||||
}
|
||||
}
|
||||
|
||||
// open transaction
|
||||
if secondaryQueryCount == 0 {
|
||||
renderQuerySuccess(c, queryResults, tokenSlice)
|
||||
return
|
||||
}
|
||||
|
||||
// enable transaction processing for secondary database queries
|
||||
tx := pgClient.WithContext(c).Begin()
|
||||
if tx.Error != nil {
|
||||
logger.Error(c, "begin postgres transaction failed", "error", tx.Error)
|
||||
renderFailure(c, constants.RespCodeServerError, "begin transaction failed", nil)
|
||||
renderFailure(c, constants.RespCodeServerError, "begin postgres database transaction failed", nil)
|
||||
return
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
allCompTags := slices.Collect(maps.Keys(dbQueryMap))
|
||||
compModelMap, err := database.QueryComponentByCompTags(c, tx, allCompTags)
|
||||
if err != nil {
|
||||
logger.Error(c, "begin postgres transaction failed", "error", tx.Error)
|
||||
renderFailure(c, constants.RespCodeServerError, "begin transaction failed", nil)
|
||||
logger.Error(c, "query component info from postgres database failed", "error", err)
|
||||
renderFailure(c, constants.RespCodeServerError, "query component meta failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
identifiers := make([]orm.ProjectIdentifier, secondaryQueryCount)
|
||||
for compTag, queryItems := range dbQueryMap {
|
||||
compInfo, exists := compModelMap[compTag]
|
||||
if !exists {
|
||||
// TODO 根据compTag下queryItems的更新 queryResults
|
||||
fmt.Println(11111)
|
||||
// batch retrieve component metadata
|
||||
identifiers := make([]orm.ProjectIdentifier, 0, secondaryQueryCount)
|
||||
for tag, items := range dbQueryMap {
|
||||
comp, ok := compModelMap[tag]
|
||||
if !ok {
|
||||
for _, it := range items {
|
||||
queryResults[it.token] = queryResult{err: errcode.ErrFoundTargetFailed}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for index, queryItem := range queryItems {
|
||||
for i := range items {
|
||||
items[i].attributeModelName = comp.ModelName
|
||||
items[i].globalUUID = comp.GlobalUUID
|
||||
identifiers = append(identifiers, orm.ProjectIdentifier{
|
||||
Token: queryItem.token,
|
||||
Tag: queryItem.attributeCompTag,
|
||||
MetaModel: compInfo.ModelName,
|
||||
Token: items[i].token, Tag: comp.ModelName, MetaModel: items[i].attributeExtendType,
|
||||
})
|
||||
queryItems[index].attributeModelName = compInfo.ModelName
|
||||
queryItems[index].globalUUID = compInfo.GlobalUUID
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
tableNameMap, err := database.BatchGetProjectNames(tx, identifiers)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
|
||||
for _, id := range identifiers {
|
||||
if _, exists := queryResults[id.Token]; !exists {
|
||||
queryResults[id.Token] = queryResult{err: errcode.ErrRetrieveFailed.WithCause(err)}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
redisUpdateMap := make(map[string][]cacheQueryItem)
|
||||
for compTag, queryItems := range dbQueryMap {
|
||||
fmt.Println(compTag, queryItems)
|
||||
for _, queryItem := range queryItems {
|
||||
fmt.Println(queryItem)
|
||||
id := orm.ProjectIdentifier{Tag: queryItem.attributeModelName, MetaModel: queryItem.attributeExtendType}
|
||||
tableName, exists := tableNameMap[id]
|
||||
if !exists {
|
||||
// TODO 优化先判断token是否存在
|
||||
queryResults[queryItem.token] = queryResult{err: errcode.ErrFoundTargetFailed}
|
||||
continue
|
||||
}
|
||||
|
||||
cacheValue := make(map[string]any)
|
||||
result := tx.Table(tableName).Select(queryItem.attributeName).
|
||||
Where("global_uuid = ?", queryItem.globalUUID).First(cacheValue)
|
||||
|
||||
if result.Error != nil {
|
||||
queryResults[queryItem.token] = queryResult{err: errcode.ErrDBQueryFailed}
|
||||
continue
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
queryResults[queryItem.token] = queryResult{err: errcode.ErrDBzeroAffectedRows}
|
||||
continue
|
||||
}
|
||||
// TODO 更新对应的redis hset中的值
|
||||
attributeValue := cacheValue[queryItem.attributeName].(string)
|
||||
queryResults[queryItem.token] = queryResult{value: attributeValue}
|
||||
queryItem.attributeVal = attributeValue
|
||||
redisUpdateMap[queryItem.token] = append(redisUpdateMap[queryItem.token], queryItem)
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// commit transaction
|
||||
if err := tx.Commit().Error; err != nil {
|
||||
renderFailure(c, constants.RespCodeServerError, "transaction commit failed", nil)
|
||||
logger.Error(c, "batch get table names from postgres database failed", "error", err)
|
||||
renderFailure(c, constants.RespCodeServerError, "batch get table names from postgres database failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
for hSetKey, items := range redisUpdateMap {
|
||||
hset := diagram.NewRedisHash(c, hSetKey, 5000, false)
|
||||
fields := make(map[string]any, len(items))
|
||||
redisSyncMap := make(map[string][]cacheQueryItem)
|
||||
for _, items := range dbQueryMap {
|
||||
for _, item := range items {
|
||||
fields[item.attributeName] = item.attributeVal
|
||||
}
|
||||
|
||||
if err := hset.SetRedisHashByMap(fields); err != nil {
|
||||
logger.Error(c, "batch sync redis failed", "hash_key", hSetKey, "error", err)
|
||||
|
||||
for _, item := range items {
|
||||
if _, exists := queryResults[item.token]; exists {
|
||||
queryResults[item.token] = queryResult{err: errcode.ErrCacheSyncWarn.WithCause(err)}
|
||||
}
|
||||
if _, exists := queryResults[item.token]; exists {
|
||||
continue
|
||||
}
|
||||
|
||||
tbl, ok := tableNameMap[orm.ProjectIdentifier{Tag: item.attributeModelName, MetaModel: item.attributeExtendType}]
|
||||
if !ok {
|
||||
queryResults[item.token] = queryResult{err: errcode.ErrFoundTargetFailed}
|
||||
continue
|
||||
}
|
||||
|
||||
var dbVal string
|
||||
res := tx.Table(tbl).Select(item.attributeName).Where("global_uuid = ?", item.globalUUID).Scan(&dbVal)
|
||||
if res.Error != nil || res.RowsAffected == 0 {
|
||||
queryResults[item.token] = queryResult{err: errcode.ErrDBQueryFailed}
|
||||
continue
|
||||
}
|
||||
|
||||
queryResults[item.token] = queryResult{value: dbVal}
|
||||
item.attributeVal = dbVal
|
||||
hKey := fmt.Sprintf("%s_%s", item.attributeCompTag, item.attributeExtendType)
|
||||
redisSyncMap[hKey] = append(redisSyncMap[hKey], item)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit().Error; err != nil {
|
||||
logger.Error(c, "postgres database transaction commit failed", "error", err)
|
||||
renderFailure(c, constants.RespCodeServerError, "postgres database transaction commit failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
for hKey, items := range redisSyncMap {
|
||||
go backfillRedis(c.Copy(), hKey, items)
|
||||
}
|
||||
|
||||
renderQuerySuccess(c, queryResults, tokenSlice)
|
||||
}
|
||||
|
||||
func backfillRedis(ctx context.Context, hSetKey string, items []cacheQueryItem) {
|
||||
hset := diagram.NewRedisHash(ctx, hSetKey, 5000, false)
|
||||
fields := make(map[string]any, len(items))
|
||||
for _, item := range items {
|
||||
// 只回填有值的项
|
||||
if item.attributeVal != "" {
|
||||
fields[item.attributeName] = item.attributeVal
|
||||
}
|
||||
}
|
||||
|
||||
if len(fields) > 0 {
|
||||
if err := hset.SetRedisHashByMap(fields); err != nil {
|
||||
logger.Error(ctx, "async backfill redis failed", "hash_key", hSetKey, "error", err)
|
||||
} else {
|
||||
logger.Info(ctx, "async backfill redis success", "hash_key", hSetKey, "count", len(fields))
|
||||
}
|
||||
}
|
||||
// payload := generateRespPayload(queryResults, request.AttributeConfigs)
|
||||
// renderRespSuccess(c, constants.RespCodeSuccess, "process completed", payload)
|
||||
}
|
||||
|
||||
type cacheQueryItem struct {
|
||||
|
|
|
|||
|
|
@ -30,3 +30,11 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) {
|
|||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func renderQuerySuccess(c *gin.Context, queryResult map[string]queryResult, payload any) {
|
||||
resp := network.SuccessResponse{}
|
||||
if payload != nil {
|
||||
resp.Payload = payload
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue