From c1691d4da28ea323d8b2624fd28cd08abeb72bb2 Mon Sep 17 00:00:00 2001 From: douxu Date: Thu, 9 Jan 2025 15:56:40 +0800 Subject: [PATCH] fix bug of load data from postgres --- config/config.yaml | 2 +- database/query_component.go | 5 +++-- database/query_page.go | 7 ++++--- database/query_topologic.go | 14 ++++++++------ main.go | 27 +++++++++++++++------------ pool/concurrency_model_parse.go | 1 - 6 files changed, 31 insertions(+), 25 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 693ca3c..a42f9af 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,5 +1,5 @@ postgres: - host: "192.168.2.100" + host: "192.168.2.103" port: 5432 database: "demo" user: "postgres" diff --git a/database/query_component.go b/database/query_component.go index dcc8561..8ee154b 100644 --- a/database/query_component.go +++ b/database/query_component.go @@ -13,17 +13,18 @@ import ( "github.com/panjf2000/ants/v2" "go.uber.org/zap" + "gorm.io/gorm" "gorm.io/gorm/clause" ) // QueryCircuitDiagramComponentFromDB return the result of query circuit diagram component info order by page id from postgresDB -func QueryCircuitDiagramComponentFromDB(ctx context.Context, pool *ants.PoolWithFunc, logger *zap.Logger) error { +func QueryCircuitDiagramComponentFromDB(ctx context.Context, tx *gorm.DB, pool *ants.PoolWithFunc, logger *zap.Logger) error { var Components []orm.Component // ctx超时判断 cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // TODO 将 for update 操作放到事务中 - result := _globalPostgresClient.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&Components) + result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&Components) if result.Error != nil { logger.Error("query circuit diagram component info failed", zap.Error(result.Error)) return result.Error diff --git a/database/query_page.go b/database/query_page.go index 723ad88..a974020 100644 --- a/database/query_page.go +++ b/database/query_page.go @@ -8,17 +8,18 @@ import ( "modelRT/orm" "go.uber.org/zap" + "gorm.io/gorm" "gorm.io/gorm/clause" ) // QueryAllPages return the all page info of the circuit diagram query by grid_id and zone_id and station_id -func QueryAllPages(ctx context.Context, logger *zap.Logger, gridID, zoneID, stationID int64) ([]orm.Page, error) { +func QueryAllPages(ctx context.Context, tx *gorm.DB, logger *zap.Logger, gridID, zoneID, stationID int64) ([]orm.Page, error) { var pages []orm.Page // ctx超时判断 cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - // TODO 将 for update 操作放到事务中 - result := _globalPostgresClient.Model(&orm.Page{}).WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Select(`"page".id, "page".Name, "page".status,"page".context`).Joins(`inner join "station" on "station".id = "page".station_id`).Joins(`inner join "zone" on "zone".id = "station".zone_id`).Joins(`inner join "grid" on "grid".id = "zone".grid_id`).Where(`"grid".id = ? and "zone".id = ? and "station".id = ?`, gridID, zoneID, stationID).Scan(&pages) + + result := tx.Model(&orm.Page{}).WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Select(`"page".id, "page".Name, "page".status,"page".context`).Joins(`inner join "station" on "station".id = "page".station_id`).Joins(`inner join "zone" on "zone".id = "station".zone_id`).Joins(`inner join "grid" on "grid".id = "zone".grid_id`).Where(`"grid".id = ? and "zone".id = ? and "station".id = ?`, gridID, zoneID, stationID).Scan(&pages) if result.Error != nil { logger.Error("query circuit diagram pages by gridID and zoneID and stationID failed", zap.Int64("grid_id", gridID), zap.Int64("zone_id", zoneID), zap.Int64("station_id", stationID), zap.Error(result.Error)) diff --git a/database/query_topologic.go b/database/query_topologic.go index 1ceb7d2..c21ba86 100644 --- a/database/query_topologic.go +++ b/database/query_topologic.go @@ -11,17 +11,18 @@ import ( "github.com/gofrs/uuid" "go.uber.org/zap" + "gorm.io/gorm" "gorm.io/gorm/clause" ) // QueryTopologicByPageID return the topologic info of the circuit diagram query by pageID -func QueryTopologicByPageID(ctx context.Context, logger *zap.Logger, pageID int64) ([]orm.Topologic, error) { +func QueryTopologicByPageID(ctx context.Context, tx *gorm.DB, logger *zap.Logger, pageID int64) ([]orm.Topologic, error) { var topologics []orm.Topologic // ctx超时判断 cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - // TODO 将 for update 操作放到事务中 - result := _globalPostgresClient.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Raw(sql.RecursiveSQL, pageID).Scan(&topologics) + + result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Raw(sql.RecursiveSQL, pageID).Scan(&topologics) if result.Error != nil { logger.Error("query circuit diagram topologic info by pageID failed", zap.Int64("pageID", pageID), zap.Error(result.Error)) return nil, result.Error @@ -30,19 +31,20 @@ func QueryTopologicByPageID(ctx context.Context, logger *zap.Logger, pageID int6 } // QueryTopologicFromDB return the result of query topologic info from postgresDB -func QueryTopologicFromDB(ctx context.Context, logger *zap.Logger, gridID, zoneID, stationID int64) error { - allPages, err := QueryAllPages(ctx, logger, gridID, zoneID, stationID) +func QueryTopologicFromDB(ctx context.Context, tx *gorm.DB, logger *zap.Logger, gridID, zoneID, stationID int64) error { + allPages, err := QueryAllPages(ctx, tx, logger, gridID, zoneID, stationID) if err != nil { logger.Error("query all pages info failed", zap.Int64("gridID", gridID), zap.Int64("zoneID", zoneID), zap.Int64("stationID", stationID), zap.Error(err)) return err } for _, page := range allPages { - topologicInfos, err := QueryTopologicByPageID(ctx, logger, page.ID) + topologicInfos, err := QueryTopologicByPageID(ctx, tx, logger, page.ID) if err != nil { logger.Error("query topologic info by pageID failed", zap.Int64("pageID", page.ID), zap.Error(err)) return err } + err = InitCircuitDiagramTopologic(page.ID, topologicInfos) if err != nil { logger.Error("init topologic failed", zap.Error(err)) diff --git a/main.go b/main.go index 5961298..ad4d9ce 100644 --- a/main.go +++ b/main.go @@ -84,19 +84,22 @@ func main() { defer cancel() go realtimedata.DataPolling(cancelCtx, pollingPool) - // load circuit diagram from postgres - err = database.QueryCircuitDiagramComponentFromDB(ctx, parsePool, zapLogger) - if err != nil { - zapLogger.Error("load circuit diagrams from postgres failed", zap.Error(err)) - panic(err) - } + postgresDBClient.Transaction(func(tx *gorm.DB) error { + // load circuit diagram from postgres + err := database.QueryCircuitDiagramComponentFromDB(ctx, tx, parsePool, zapLogger) + if err != nil { + zapLogger.Error("load circuit diagrams from postgres failed", zap.Error(err)) + panic(err) + } - // TODO 暂时屏蔽完成 swagger 启动测试 - err = database.QueryTopologicFromDB(ctx, zapLogger, modelRTConfig.GridID, modelRTConfig.ZoneID, modelRTConfig.StationID) - if err != nil { - zapLogger.Error("load topologic info from postgres failed", zap.Error(err)) - panic(err) - } + // TODO 暂时屏蔽完成 swagger 启动测试 + err = database.QueryTopologicFromDB(ctx, tx, zapLogger, modelRTConfig.GridID, modelRTConfig.ZoneID, modelRTConfig.StationID) + if err != nil { + zapLogger.Error("load topologic info from postgres failed", zap.Error(err)) + panic(err) + } + return nil + }) // TODO 完成订阅数据分析 // TODO 暂时屏蔽完成 swagger 启动测试 diff --git a/pool/concurrency_model_parse.go b/pool/concurrency_model_parse.go index 8072c41..3b499c1 100644 --- a/pool/concurrency_model_parse.go +++ b/pool/concurrency_model_parse.go @@ -70,7 +70,6 @@ var ParseFunc = func(parseConfig interface{}) { anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap) diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.GlobalUUID.String(), anchorName) realtimedata.AnchorParamsChan <- anchorParam - } }