fix bug of load data from postgres

This commit is contained in:
douxu 2025-01-09 15:56:40 +08:00
parent 655acf8e1e
commit c1691d4da2
6 changed files with 31 additions and 25 deletions

View File

@ -1,5 +1,5 @@
postgres:
host: "192.168.2.100"
host: "192.168.2.103"
port: 5432
database: "demo"
user: "postgres"

View File

@ -13,17 +13,18 @@ import (
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// QueryCircuitDiagramComponentFromDB return the result of query circuit diagram component info order by page id from postgresDB
func QueryCircuitDiagramComponentFromDB(ctx context.Context, pool *ants.PoolWithFunc, logger *zap.Logger) error {
func QueryCircuitDiagramComponentFromDB(ctx context.Context, tx *gorm.DB, pool *ants.PoolWithFunc, logger *zap.Logger) error {
var Components []orm.Component
// ctx超时判断
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// TODO 将 for update 操作放到事务中
result := _globalPostgresClient.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&Components)
result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&Components)
if result.Error != nil {
logger.Error("query circuit diagram component info failed", zap.Error(result.Error))
return result.Error

View File

@ -8,17 +8,18 @@ import (
"modelRT/orm"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// QueryAllPages return the all page info of the circuit diagram query by grid_id and zone_id and station_id
func QueryAllPages(ctx context.Context, logger *zap.Logger, gridID, zoneID, stationID int64) ([]orm.Page, error) {
func QueryAllPages(ctx context.Context, tx *gorm.DB, logger *zap.Logger, gridID, zoneID, stationID int64) ([]orm.Page, error) {
var pages []orm.Page
// ctx超时判断
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// TODO 将 for update 操作放到事务中
result := _globalPostgresClient.Model(&orm.Page{}).WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Select(`"page".id, "page".Name, "page".status,"page".context`).Joins(`inner join "station" on "station".id = "page".station_id`).Joins(`inner join "zone" on "zone".id = "station".zone_id`).Joins(`inner join "grid" on "grid".id = "zone".grid_id`).Where(`"grid".id = ? and "zone".id = ? and "station".id = ?`, gridID, zoneID, stationID).Scan(&pages)
result := tx.Model(&orm.Page{}).WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Select(`"page".id, "page".Name, "page".status,"page".context`).Joins(`inner join "station" on "station".id = "page".station_id`).Joins(`inner join "zone" on "zone".id = "station".zone_id`).Joins(`inner join "grid" on "grid".id = "zone".grid_id`).Where(`"grid".id = ? and "zone".id = ? and "station".id = ?`, gridID, zoneID, stationID).Scan(&pages)
if result.Error != nil {
logger.Error("query circuit diagram pages by gridID and zoneID and stationID failed", zap.Int64("grid_id", gridID), zap.Int64("zone_id", zoneID), zap.Int64("station_id", stationID), zap.Error(result.Error))

View File

@ -11,17 +11,18 @@ import (
"github.com/gofrs/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// QueryTopologicByPageID return the topologic info of the circuit diagram query by pageID
func QueryTopologicByPageID(ctx context.Context, logger *zap.Logger, pageID int64) ([]orm.Topologic, error) {
func QueryTopologicByPageID(ctx context.Context, tx *gorm.DB, logger *zap.Logger, pageID int64) ([]orm.Topologic, error) {
var topologics []orm.Topologic
// ctx超时判断
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// TODO 将 for update 操作放到事务中
result := _globalPostgresClient.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Raw(sql.RecursiveSQL, pageID).Scan(&topologics)
result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Raw(sql.RecursiveSQL, pageID).Scan(&topologics)
if result.Error != nil {
logger.Error("query circuit diagram topologic info by pageID failed", zap.Int64("pageID", pageID), zap.Error(result.Error))
return nil, result.Error
@ -30,19 +31,20 @@ func QueryTopologicByPageID(ctx context.Context, logger *zap.Logger, pageID int6
}
// QueryTopologicFromDB return the result of query topologic info from postgresDB
func QueryTopologicFromDB(ctx context.Context, logger *zap.Logger, gridID, zoneID, stationID int64) error {
allPages, err := QueryAllPages(ctx, logger, gridID, zoneID, stationID)
func QueryTopologicFromDB(ctx context.Context, tx *gorm.DB, logger *zap.Logger, gridID, zoneID, stationID int64) error {
allPages, err := QueryAllPages(ctx, tx, logger, gridID, zoneID, stationID)
if err != nil {
logger.Error("query all pages info failed", zap.Int64("gridID", gridID), zap.Int64("zoneID", zoneID), zap.Int64("stationID", stationID), zap.Error(err))
return err
}
for _, page := range allPages {
topologicInfos, err := QueryTopologicByPageID(ctx, logger, page.ID)
topologicInfos, err := QueryTopologicByPageID(ctx, tx, logger, page.ID)
if err != nil {
logger.Error("query topologic info by pageID failed", zap.Int64("pageID", page.ID), zap.Error(err))
return err
}
err = InitCircuitDiagramTopologic(page.ID, topologicInfos)
if err != nil {
logger.Error("init topologic failed", zap.Error(err))

View File

@ -84,19 +84,22 @@ func main() {
defer cancel()
go realtimedata.DataPolling(cancelCtx, pollingPool)
postgresDBClient.Transaction(func(tx *gorm.DB) error {
// load circuit diagram from postgres
err = database.QueryCircuitDiagramComponentFromDB(ctx, parsePool, zapLogger)
err := database.QueryCircuitDiagramComponentFromDB(ctx, tx, parsePool, zapLogger)
if err != nil {
zapLogger.Error("load circuit diagrams from postgres failed", zap.Error(err))
panic(err)
}
// TODO 暂时屏蔽完成 swagger 启动测试
err = database.QueryTopologicFromDB(ctx, zapLogger, modelRTConfig.GridID, modelRTConfig.ZoneID, modelRTConfig.StationID)
err = database.QueryTopologicFromDB(ctx, tx, zapLogger, modelRTConfig.GridID, modelRTConfig.ZoneID, modelRTConfig.StationID)
if err != nil {
zapLogger.Error("load topologic info from postgres failed", zap.Error(err))
panic(err)
}
return nil
})
// TODO 完成订阅数据分析
// TODO 暂时屏蔽完成 swagger 启动测试

View File

@ -70,7 +70,6 @@ var ParseFunc = func(parseConfig interface{}) {
anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap)
diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.GlobalUUID.String(), anchorName)
realtimedata.AnchorParamsChan <- anchorParam
}
}