optimize real time data query api
This commit is contained in:
parent
34684bd5f1
commit
cf880279e4
|
|
@ -0,0 +1,43 @@
|
|||
// Package errcode provides internal error definition and business error definition
|
||||
package errcode
|
||||
|
||||
var (
|
||||
// ErrProcessSuccess define variable to indicates request process success
|
||||
ErrProcessSuccess = newError(20000, "request process success")
|
||||
|
||||
// ErrInvalidToken define variable to provided token does not conform to the expected format (e.g., missing segments)
|
||||
ErrInvalidToken = newError(40001, "invalid token format")
|
||||
|
||||
// ErrCrossToken define variable to occurs when an update attempt involves multiple components, which is restricted by business logic
|
||||
ErrCrossToken = newError(40002, "cross-component update not allowed")
|
||||
|
||||
// ErrRetrieveFailed define variable to indicates a failure in fetching the project-to-table name mapping from the configuration.
|
||||
ErrRetrieveFailed = newError(40003, "retrieve table mapping failed")
|
||||
|
||||
// ErrFoundTargetFailed define variable to returned when the specific database table cannot be identified using the provided token info.
|
||||
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
|
||||
|
||||
// ErrDBQueryFailed define variable to represents a generic failure during a PostgreSQL SELECT or SCAN operation.
|
||||
ErrDBQueryFailed = newError(50001, "query postgres database data failed")
|
||||
|
||||
// ErrDBUpdateFailed define variable to represents a failure during a PostgreSQL UPDATE or SAVE operation.
|
||||
ErrDBUpdateFailed = newError(50002, "update postgres database data failed")
|
||||
|
||||
// ErrDBzeroAffectedRows define variable to occurs when a database operation executes successfully but modifies no records.
|
||||
ErrDBzeroAffectedRows = newError(50003, "zero affected rows")
|
||||
|
||||
// ErrBeginTxFailed indicates that the system failed to start a new PostgreSQL transaction.
|
||||
ErrBeginTxFailed = newError(50004, "begin postgres transaction failed")
|
||||
|
||||
// ErrCommitTxFailed indicates that the PostgreSQL transaction could not be committed successfully.
|
||||
ErrCommitTxFailed = newError(50005, "postgres database transaction commit failed")
|
||||
|
||||
// ErrCachedQueryFailed define variable to indicates an error occurred while attempting to fetch data from the Redis cache.
|
||||
ErrCachedQueryFailed = newError(60001, "query redis cached data failed")
|
||||
|
||||
// ErrCacheSyncWarn define variable to partial success state: the database was updated, but the subsequent Redis cache refresh failed.
|
||||
ErrCacheSyncWarn = newError(60002, "postgres database updated, but cache sync failed")
|
||||
|
||||
// ErrCacheQueryFailed define variable to indicates query cached data by token failed.
|
||||
ErrCacheQueryFailed = newError(60003, "query cached data by token failed")
|
||||
)
|
||||
|
|
@ -1,15 +0,0 @@
|
|||
// Package errcode provides internal error definition and business error definition
|
||||
package errcode
|
||||
|
||||
var (
|
||||
ErrProcessSuccess = newError(20000, "token value update success")
|
||||
ErrInvalidToken = newError(40001, "invalid token format")
|
||||
ErrCrossToken = newError(40002, "cross-component update not allowed")
|
||||
ErrRetrieveFailed = newError(40003, "retrieve table mapping failed")
|
||||
ErrFoundTargetFailed = newError(40004, "found target table by token failed")
|
||||
ErrDBQueryFailed = newError(50001, "query postgres database data failed")
|
||||
ErrDBUpdateFailed = newError(50002, "update postgres database data failed")
|
||||
ErrDBzeroAffectedRows = newError(50003, "zero affected rows")
|
||||
ErrCachedQueryFailed = newError(60001, "query redis cached data failed")
|
||||
ErrCacheSyncWarn = newError(60002, "postgres database updated, but cache sync failed")
|
||||
)
|
||||
|
|
@ -28,7 +28,7 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
if tokens == "" {
|
||||
err := fmt.Errorf("tokens is missing from the path")
|
||||
logger.Error(c, "query tokens from path failed", "error", err, "url", c.Request.RequestURI)
|
||||
renderFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -61,7 +61,7 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
}
|
||||
for _, item := range items {
|
||||
if val, ok := cacheData[item.attributeName]; ok {
|
||||
queryResults[item.token] = queryResult{value: val}
|
||||
queryResults[item.token] = queryResult{err: errcode.ErrProcessSuccess, value: val}
|
||||
} else {
|
||||
dbQueryMap[item.attributeCompTag] = append(dbQueryMap[item.attributeCompTag], item)
|
||||
secondaryQueryCount++
|
||||
|
|
@ -70,15 +70,17 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
}
|
||||
|
||||
if secondaryQueryCount == 0 {
|
||||
renderQuerySuccess(c, queryResults, tokenSlice)
|
||||
payload := genQueryRespPayload(queryResults, tokenSlice)
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "query dynamic parameter values success", payload)
|
||||
return
|
||||
}
|
||||
|
||||
// enable transaction processing for secondary database queries
|
||||
tx := pgClient.WithContext(c).Begin()
|
||||
if tx.Error != nil {
|
||||
logger.Error(c, "begin postgres transaction failed", "error", tx.Error)
|
||||
renderFailure(c, constants.RespCodeServerError, "begin postgres database transaction failed", nil)
|
||||
fillRemainingErrors(queryResults, tokenSlice, errcode.ErrBeginTxFailed)
|
||||
payload := genQueryRespPayload(queryResults, tokenSlice)
|
||||
renderRespFailure(c, constants.RespCodeServerError, "begin postgres database transaction failed", payload)
|
||||
return
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
|
@ -87,7 +89,9 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
compModelMap, err := database.QueryComponentByCompTags(c, tx, allCompTags)
|
||||
if err != nil {
|
||||
logger.Error(c, "query component info from postgres database failed", "error", err)
|
||||
renderFailure(c, constants.RespCodeServerError, "query component meta failed", nil)
|
||||
fillRemainingErrors(queryResults, tokenSlice, errcode.ErrDBQueryFailed)
|
||||
payload := genQueryRespPayload(queryResults, tokenSlice)
|
||||
renderRespFailure(c, constants.RespCodeServerError, "query component meta failed", payload)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -113,7 +117,9 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
tableNameMap, err := database.BatchGetProjectNames(tx, identifiers)
|
||||
if err != nil {
|
||||
logger.Error(c, "batch get table names from postgres database failed", "error", err)
|
||||
renderFailure(c, constants.RespCodeServerError, "batch get table names from postgres database failed", nil)
|
||||
fillRemainingErrors(queryResults, tokenSlice, errcode.ErrRetrieveFailed)
|
||||
payload := genQueryRespPayload(queryResults, tokenSlice)
|
||||
renderRespFailure(c, constants.RespCodeServerError, "batch get table names from postgres database failed", payload)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -137,7 +143,7 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
continue
|
||||
}
|
||||
|
||||
queryResults[item.token] = queryResult{value: dbVal}
|
||||
queryResults[item.token] = queryResult{err: errcode.ErrProcessSuccess, value: dbVal}
|
||||
item.attributeVal = dbVal
|
||||
hKey := fmt.Sprintf("%s_%s", item.attributeCompTag, item.attributeExtendType)
|
||||
redisSyncMap[hKey] = append(redisSyncMap[hKey], item)
|
||||
|
|
@ -145,23 +151,29 @@ func ComponentAttributeQueryHandler(c *gin.Context) {
|
|||
}
|
||||
|
||||
if err := tx.Commit().Error; err != nil {
|
||||
logger.Error(c, "postgres database transaction commit failed", "error", err)
|
||||
renderFailure(c, constants.RespCodeServerError, "postgres database transaction commit failed", nil)
|
||||
return
|
||||
logger.Warn(c, "postgres transaction commit failed, but returning scanned data", "error", err)
|
||||
} else {
|
||||
for hKey, items := range redisSyncMap {
|
||||
go backfillRedis(c.Copy(), hKey, items)
|
||||
}
|
||||
}
|
||||
|
||||
for hKey, items := range redisSyncMap {
|
||||
go backfillRedis(c.Copy(), hKey, items)
|
||||
}
|
||||
payload := genQueryRespPayload(queryResults, tokenSlice)
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", payload)
|
||||
}
|
||||
|
||||
renderQuerySuccess(c, queryResults, tokenSlice)
|
||||
func fillRemainingErrors(results map[string]queryResult, tokens []string, err *errcode.AppError) {
|
||||
for _, token := range tokens {
|
||||
if _, exists := results[token]; !exists {
|
||||
results[token] = queryResult{err: err}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func backfillRedis(ctx context.Context, hSetKey string, items []cacheQueryItem) {
|
||||
hset := diagram.NewRedisHash(ctx, hSetKey, 5000, false)
|
||||
fields := make(map[string]any, len(items))
|
||||
for _, item := range items {
|
||||
// 只回填有值的项
|
||||
if item.attributeVal != "" {
|
||||
fields[item.attributeName] = item.attributeVal
|
||||
}
|
||||
|
|
@ -176,6 +188,35 @@ func backfillRedis(ctx context.Context, hSetKey string, items []cacheQueryItem)
|
|||
}
|
||||
}
|
||||
|
||||
func genQueryRespPayload(queryResults map[string]queryResult, requestTokens []string) map[string]any {
|
||||
attributes := make([]attributeQueryResult, 0, len(requestTokens))
|
||||
|
||||
for _, token := range requestTokens {
|
||||
if queryResult, exists := queryResults[token]; exists {
|
||||
attributes = append(attributes, attributeQueryResult{
|
||||
Token: token,
|
||||
Code: queryResult.err.Code(),
|
||||
Msg: queryResult.err.Msg(),
|
||||
Value: queryResult.value,
|
||||
})
|
||||
} else {
|
||||
err := errcode.ErrCacheQueryFailed
|
||||
attributes = append(attributes, attributeQueryResult{
|
||||
Token: token,
|
||||
Code: err.Code(),
|
||||
Msg: err.Msg(),
|
||||
Value: "",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
payload := map[string]any{
|
||||
"attributes": attributes,
|
||||
}
|
||||
|
||||
return payload
|
||||
}
|
||||
|
||||
type cacheQueryItem struct {
|
||||
globalUUID uuid.UUID
|
||||
token string
|
||||
|
|
@ -186,6 +227,13 @@ type cacheQueryItem struct {
|
|||
attributeVal string
|
||||
}
|
||||
|
||||
type attributeQueryResult struct {
|
||||
Token string `json:"token"`
|
||||
Msg string `json:"msg"`
|
||||
Value string `json:"value"`
|
||||
Code int `json:"code"`
|
||||
}
|
||||
|
||||
type queryResult struct {
|
||||
err *errcode.AppError
|
||||
value string
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
var request network.ComponentAttributeUpdateInfo
|
||||
if err := c.ShouldBindJSON(&request); err != nil {
|
||||
logger.Error(c, "unmarshal request params failed", "error", err)
|
||||
renderFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
renderRespFailure(c, constants.RespCodeInvalidParams, err.Error(), nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -57,7 +57,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
tx := pgClient.WithContext(c).Begin()
|
||||
if tx.Error != nil {
|
||||
logger.Error(c, "begin postgres transaction failed", "error", tx.Error)
|
||||
renderFailure(c, constants.RespCodeServerError, "begin postgres transaction failed", nil)
|
||||
renderRespFailure(c, constants.RespCodeServerError, "begin postgres transaction failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -73,8 +73,8 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
|
||||
tx.Rollback()
|
||||
|
||||
payload := generateRespPayload(updateResults, request.AttributeConfigs)
|
||||
renderFailure(c, constants.RespCodeFailed, "query component metadata failed", payload)
|
||||
payload := genUpdateRespPayload(updateResults, request.AttributeConfigs)
|
||||
renderRespFailure(c, constants.RespCodeFailed, "query component metadata failed", payload)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -96,8 +96,8 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
payload := generateRespPayload(updateResults, request.AttributeConfigs)
|
||||
renderFailure(c, constants.RespCodeFailed, "batch retrieve table names failed", payload)
|
||||
payload := genUpdateRespPayload(updateResults, request.AttributeConfigs)
|
||||
renderRespFailure(c, constants.RespCodeFailed, "batch retrieve table names failed", payload)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -134,7 +134,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
|
||||
// commit transaction
|
||||
if err := tx.Commit().Error; err != nil {
|
||||
renderFailure(c, constants.RespCodeServerError, "transaction commit failed", nil)
|
||||
renderRespFailure(c, constants.RespCodeServerError, "transaction commit failed", nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -157,7 +157,7 @@ func ComponentAttributeUpdateHandler(c *gin.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
payload := generateRespPayload(updateResults, request.AttributeConfigs)
|
||||
payload := genUpdateRespPayload(updateResults, request.AttributeConfigs)
|
||||
renderRespSuccess(c, constants.RespCodeSuccess, "process completed", payload)
|
||||
}
|
||||
|
||||
|
|
@ -175,26 +175,26 @@ type cacheUpdateItem struct {
|
|||
newVal string
|
||||
}
|
||||
|
||||
type attributeResult struct {
|
||||
type attributeUpdateResult struct {
|
||||
Token string `json:"token"`
|
||||
Code int `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
}
|
||||
|
||||
func generateRespPayload(updateResults map[string]*errcode.AppError, originalRequests []network.ComponentAttributeConfig) map[string]any {
|
||||
attributes := make([]attributeResult, 0, len(originalRequests))
|
||||
func genUpdateRespPayload(updateResults map[string]*errcode.AppError, originalRequests []network.ComponentAttributeConfig) map[string]any {
|
||||
attributes := make([]attributeUpdateResult, 0, len(originalRequests))
|
||||
|
||||
for _, req := range originalRequests {
|
||||
token := req.AttributeToken
|
||||
|
||||
if appErr, exists := updateResults[token]; exists {
|
||||
attributes = append(attributes, attributeResult{
|
||||
attributes = append(attributes, attributeUpdateResult{
|
||||
Token: token,
|
||||
Code: appErr.Code(),
|
||||
Msg: appErr.Msg(),
|
||||
})
|
||||
} else {
|
||||
attributes = append(attributes, attributeResult{
|
||||
attributes = append(attributes, attributeUpdateResult{
|
||||
Token: token,
|
||||
Code: constants.CodeSuccess,
|
||||
Msg: "token value update success",
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import (
|
|||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func renderFailure(c *gin.Context, code int, msg string, payload any) {
|
||||
func renderRespFailure(c *gin.Context, code int, msg string, payload any) {
|
||||
resp := network.FailureResponse{
|
||||
Code: code,
|
||||
Msg: msg,
|
||||
|
|
@ -30,11 +30,3 @@ func renderRespSuccess(c *gin.Context, code int, msg string, payload any) {
|
|||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func renderQuerySuccess(c *gin.Context, queryResult map[string]queryResult, payload any) {
|
||||
resp := network.SuccessResponse{}
|
||||
if payload != nil {
|
||||
resp.Payload = payload
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,6 @@ import (
|
|||
// registerComponentRoutes define func of register component routes
|
||||
func registerComponentRoutes(rg *gin.RouterGroup) {
|
||||
g := rg.Group("/component/")
|
||||
g.GET("attribute/get/:token", handler.ComponentAttributeQueryHandler)
|
||||
g.GET("attribute/get/:tokens", handler.ComponentAttributeQueryHandler)
|
||||
g.POST("attribute/update", handler.ComponentAttributeUpdateHandler)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue