perf: parallelize GetFullMeasurementSet with errgroup

- run 5 independent DB queries concurrently via errgroup.WithContext
  - add ctx parameter and bind db with WithContext for cancellation support
  - replace silent error swallowing (if err == nil) with wrapped error returns
  - promote golang.org/x/sync to direct dependency in go.mod
This commit is contained in:
douxu 2026-05-18 16:49:46 +08:00
parent 42956d1793
commit d051c161b7
3 changed files with 77 additions and 48 deletions

View File

@ -2,8 +2,12 @@
package database
import (
"context"
"fmt"
"modelRT/orm"
"golang.org/x/sync/errgroup"
"gorm.io/gorm"
)
@ -17,7 +21,7 @@ type StationWithParent struct {
ZoneTag string `gorm:"column:zone_tag"`
}
func GetFullMeasurementSet(db *gorm.DB) (*orm.MeasurementSet, error) {
func GetFullMeasurementSet(ctx context.Context, db *gorm.DB) (*orm.MeasurementSet, error) {
mSet := &orm.MeasurementSet{
GridToZoneTags: make(map[string][]string),
ZoneToStationTags: make(map[string][]string),
@ -26,85 +30,110 @@ func GetFullMeasurementSet(db *gorm.DB) (*orm.MeasurementSet, error) {
CompTagToMeasTags: make(map[string][]string),
}
var grids []orm.Grid
if err := db.Table("grid").Select("tagname").Scan(&grids).Error; err == nil {
for _, g := range grids {
if g.TAGNAME != "" {
mSet.AllGridTags = append(mSet.AllGridTags, g.TAGNAME)
g, gctx := errgroup.WithContext(ctx)
db = db.WithContext(gctx)
g.Go(func() error {
var grids []orm.Grid
if err := db.Table("grid").Select("tagname").Scan(&grids).Error; err != nil {
return fmt.Errorf("query grids: %w", err)
}
for _, grid := range grids {
if grid.TAGNAME != "" {
mSet.AllGridTags = append(mSet.AllGridTags, grid.TAGNAME)
}
}
}
return nil
})
var zones []struct {
orm.Zone
GridTag string `gorm:"column:grid_tag"`
}
if err := db.Table("zone").
Select("zone.*, grid.tagname as grid_tag").
Joins("left join grid on zone.grid_id = grid.id").
Scan(&zones).Error; err == nil {
g.Go(func() error {
var zones []struct {
orm.Zone
GridTag string `gorm:"column:grid_tag"`
}
if err := db.Table("zone").
Select("zone.*, grid.tagname as grid_tag").
Joins("left join grid on zone.grid_id = grid.id").
Scan(&zones).Error; err != nil {
return fmt.Errorf("query zones: %w", err)
}
for _, z := range zones {
mSet.AllZoneTags = append(mSet.AllZoneTags, z.TAGNAME)
if z.GridTag != "" {
mSet.GridToZoneTags[z.GridTag] = append(mSet.GridToZoneTags[z.GridTag], z.TAGNAME)
}
}
}
return nil
})
var stations []struct {
orm.Station
ZoneTag string `gorm:"column:zone_tag"`
}
if err := db.Table("station").
Select("station.*, zone.tagname as zone_tag").
Joins("left join zone on station.zone_id = zone.id").
Scan(&stations).Error; err == nil {
g.Go(func() error {
var stations []struct {
orm.Station
ZoneTag string `gorm:"column:zone_tag"`
}
if err := db.Table("station").
Select("station.*, zone.tagname as zone_tag").
Joins("left join zone on station.zone_id = zone.id").
Scan(&stations).Error; err != nil {
return fmt.Errorf("query stations: %w", err)
}
for _, s := range stations {
mSet.AllStationTags = append(mSet.AllStationTags, s.TAGNAME)
if s.ZoneTag != "" {
mSet.ZoneToStationTags[s.ZoneTag] = append(mSet.ZoneToStationTags[s.ZoneTag], s.TAGNAME)
}
}
}
return nil
})
var comps []struct {
orm.Component
StationTag string `gorm:"column:station_tag"`
}
if err := db.Table("component").
Select("component.*, station.tagname as station_tag").
Joins("left join station on component.station_id = station.id").
Scan(&comps).Error; err == nil {
g.Go(func() error {
var comps []struct {
orm.Component
StationTag string `gorm:"column:station_tag"`
}
if err := db.Table("component").
Select("component.*, station.tagname as station_tag").
Joins("left join station on component.station_id = station.id").
Scan(&comps).Error; err != nil {
return fmt.Errorf("query components: %w", err)
}
for _, c := range comps {
mSet.AllCompNSPaths = append(mSet.AllCompNSPaths, c.NSPath)
mSet.AllCompTags = append(mSet.AllCompTags, c.Tag)
if c.StationTag != "" {
mSet.StationToCompNSPaths[c.StationTag] = append(mSet.StationToCompNSPaths[c.StationTag], c.NSPath)
}
if c.NSPath != "" {
mSet.CompNSPathToCompTags[c.NSPath] = append(mSet.CompNSPathToCompTags[c.NSPath], c.Tag)
}
}
}
return nil
})
mSet.AllConfigTags = append(mSet.AllConfigTags, "bay")
var measurements []struct {
orm.Measurement
CompTag string `gorm:"column:comp_tag"`
}
if err := db.Table("measurement").
Select("measurement.*, component.tag as comp_tag").
Joins("left join component on measurement.component_uuid = component.global_uuid").
Scan(&measurements).Error; err == nil {
g.Go(func() error {
var measurements []struct {
orm.Measurement
CompTag string `gorm:"column:comp_tag"`
}
if err := db.Table("measurement").
Select("measurement.*, component.tag as comp_tag").
Joins("left join component on measurement.component_uuid = component.global_uuid").
Scan(&measurements).Error; err != nil {
return fmt.Errorf("query measurements: %w", err)
}
for _, m := range measurements {
mSet.AllMeasTags = append(mSet.AllMeasTags, m.Tag)
if m.CompTag != "" {
mSet.CompTagToMeasTags[m.CompTag] = append(mSet.CompTagToMeasTags[m.CompTag], m.Tag)
}
}
return nil
})
if err := g.Wait(); err != nil {
return nil, err
}
mSet.AllConfigTags = append(mSet.AllConfigTags, "bay")
return mSet, nil
}

2
go.mod
View File

@ -27,6 +27,7 @@ require (
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.20.0
gorm.io/driver/mysql v1.5.7
gorm.io/driver/postgres v1.5.9
gorm.io/gorm v1.25.12
@ -94,7 +95,6 @@ require (
golang.org/x/crypto v0.49.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
golang.org/x/tools v0.42.0 // indirect

View File

@ -214,7 +214,7 @@ func main() {
panic(err)
}
measurementSet, err := database.GetFullMeasurementSet(tx)
measurementSet, err := database.GetFullMeasurementSet(ctx, tx)
if err != nil {
logger.Error(ctx, "generate component measurement group failed", "error", err)
panic(err)