diff --git a/config/config.go b/config/config.go index 1425f45..c4a69d7 100644 --- a/config/config.go +++ b/config/config.go @@ -92,6 +92,12 @@ type DataRTConfig struct { Method string `mapstructure:"polling_api_method"` } +// OtelConfig define config struct of OpenTelemetry tracing +type OtelConfig struct { + Endpoint string `mapstructure:"endpoint"` // e.g. "localhost:4318" + Insecure bool `mapstructure:"insecure"` +} + // AsyncTaskConfig define config struct of asynchronous task system type AsyncTaskConfig struct { WorkerPoolSize int `mapstructure:"worker_pool_size"` @@ -115,7 +121,8 @@ type ModelRTConfig struct { LockerRedisConfig RedisConfig `mapstructure:"locker_redis"` StorageRedisConfig RedisConfig `mapstructure:"storage_redis"` AsyncTaskConfig AsyncTaskConfig `mapstructure:"async_task"` - PostgresDBURI string `mapstructure:"-"` + OtelConfig OtelConfig `mapstructure:"otel"` + PostgresDBURI string `mapstructure:"-"` } // ReadAndInitConfig return modelRT project config struct diff --git a/database/create_component.go b/database/create_component.go index 1c288c0..3d304a5 100644 --- a/database/create_component.go +++ b/database/create_component.go @@ -33,7 +33,7 @@ func CreateComponentIntoDB(ctx context.Context, tx *gorm.DB, componentInfo netwo Name: componentInfo.Name, Context: componentInfo.Context, Op: componentInfo.Op, - Ts: time.Now(), + TS: time.Now(), } result := tx.WithContext(cancelCtx).Create(&component) diff --git a/database/create_measurement.go b/database/create_measurement.go index 4085c94..30d6ad6 100644 --- a/database/create_measurement.go +++ b/database/create_measurement.go @@ -35,7 +35,7 @@ func CreateMeasurement(ctx context.Context, tx *gorm.DB, measurementInfo network BayUUID: globalUUID, ComponentUUID: globalUUID, Op: -1, - Ts: time.Now(), + TS: time.Now(), } result := tx.WithContext(cancelCtx).Create(&measurement) diff --git a/database/query_bay.go b/database/query_bay.go new file mode 100644 index 0000000..04ca639 --- /dev/null +++ b/database/query_bay.go @@ -0,0 +1,56 @@ +// Package database define database operation functions +package database + +import ( + "context" + "time" + + "modelRT/logger" + "modelRT/orm" + + "github.com/gofrs/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// QueryBayByUUID returns the Bay record matching bayUUID. +func QueryBayByUUID(ctx context.Context, tx *gorm.DB, bayUUID uuid.UUID) (*orm.Bay, error) { + var bay orm.Bay + + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Where("bay_uuid = ?", bayUUID). + Clauses(clause.Locking{Strength: "UPDATE"}). + First(&bay) + + if result.Error != nil { + return nil, result.Error + } + return &bay, nil +} + +// QueryBaysByUUIDs returns Bay records matching the given UUIDs in a single query. +// The returned slice preserves database order; unmatched UUIDs are silently omitted. +func QueryBaysByUUIDs(ctx context.Context, tx *gorm.DB, bayUUIDs []uuid.UUID) ([]orm.Bay, error) { + if len(bayUUIDs) == 0 { + return nil, nil + } + + var bays []orm.Bay + + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Where("bay_uuid IN ?", bayUUIDs). + Clauses(clause.Locking{Strength: "UPDATE"}). + Find(&bays) + + if result.Error != nil { + logger.Error(ctx, "query bays by uuids failed", "error", result.Error) + return nil, result.Error + } + return bays, nil +} diff --git a/database/query_component.go b/database/query_component.go index 9e8798d..73ca27c 100644 --- a/database/query_component.go +++ b/database/query_component.go @@ -148,6 +148,39 @@ func QueryLongIdentModelInfoByToken(ctx context.Context, tx *gorm.DB, measTag st return &resultComp, &meauserment, nil } +// QueryComponentsInServiceByUUIDs returns a map of global_uuid → in_service for the +// given UUIDs. Only global_uuid and in_service columns are selected for efficiency. +func QueryComponentsInServiceByUUIDs(ctx context.Context, tx *gorm.DB, uuids []uuid.UUID) (map[uuid.UUID]bool, error) { + if len(uuids) == 0 { + return make(map[uuid.UUID]bool), nil + } + + type row struct { + GlobalUUID uuid.UUID `gorm:"column:global_uuid"` + InService bool `gorm:"column:in_service"` + } + + var rows []row + + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Model(&orm.Component{}). + Select("global_uuid, in_service"). + Where("global_uuid IN ?", uuids). + Scan(&rows) + if result.Error != nil { + return nil, result.Error + } + + m := make(map[uuid.UUID]bool, len(rows)) + for _, r := range rows { + m[r.GlobalUUID] = r.InService + } + return m, nil +} + // QueryShortIdentModelInfoByToken define func to query short identity model info by short token func QueryShortIdentModelInfoByToken(ctx context.Context, tx *gorm.DB, measTag string, condition *orm.Component) (*orm.Component, *orm.Measurement, error) { var resultComp orm.Component diff --git a/database/query_topologic.go b/database/query_topologic.go index eea21b1..4799875 100644 --- a/database/query_topologic.go +++ b/database/query_topologic.go @@ -32,71 +32,51 @@ func QueryTopologic(ctx context.Context, tx *gorm.DB) ([]orm.Topologic, error) { return topologics, nil } -// QueryTopologicFromDB return the result of query topologic info from DB -func QueryTopologicFromDB(ctx context.Context, tx *gorm.DB) (*diagram.MultiBranchTreeNode, error) { +// QueryTopologicByStartUUID returns all edges reachable from startUUID following +// directed uuid_from → uuid_to edges in the topologic table. +func QueryTopologicByStartUUID(ctx context.Context, tx *gorm.DB, startUUID uuid.UUID) ([]orm.Topologic, error) { + var topologics []orm.Topologic + + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + result := tx.WithContext(cancelCtx). + Clauses(clause.Locking{Strength: "UPDATE"}). + Raw(sql.RecursiveSQL, startUUID). + Scan(&topologics) + if result.Error != nil { + logger.Error(ctx, "query topologic by start uuid failed", "start_uuid", startUUID, "error", result.Error) + return nil, result.Error + } + return topologics, nil +} + +// QueryTopologicFromDB return the result of query topologic info from DB. +// Returns the root node and a flat nodeMap for O(1) lookup by UUID. +func QueryTopologicFromDB(ctx context.Context, tx *gorm.DB) (*diagram.MultiBranchTreeNode, map[uuid.UUID]*diagram.MultiBranchTreeNode, error) { topologicInfos, err := QueryTopologic(ctx, tx) if err != nil { logger.Error(ctx, "query topologic info failed", "error", err) - return nil, err + return nil, nil, err } - tree, err := BuildMultiBranchTree(topologicInfos) + tree, nodeMap, err := BuildMultiBranchTree(topologicInfos) if err != nil { logger.Error(ctx, "init topologic failed", "error", err) - return nil, err + return nil, nil, err } - return tree, nil + return tree, nodeMap, nil } -// InitCircuitDiagramTopologic return circuit diagram topologic info from postgres -func InitCircuitDiagramTopologic(topologicNodes []orm.Topologic) error { - var rootVertex *diagram.MultiBranchTreeNode - for _, node := range topologicNodes { - if node.UUIDFrom == constants.UUIDNil { - rootVertex = diagram.NewMultiBranchTree(node.UUIDFrom) - break - } - } - - if rootVertex == nil { - return fmt.Errorf("root vertex is nil") - } - - for _, node := range topologicNodes { - if node.UUIDFrom == constants.UUIDNil { - nodeVertex := diagram.NewMultiBranchTree(node.UUIDTo) - rootVertex.AddChild(nodeVertex) - } - } - - node := rootVertex - for _, nodeVertex := range node.Children { - nextVertexs := make([]*diagram.MultiBranchTreeNode, 0) - nextVertexs = append(nextVertexs, nodeVertex) - } - return nil -} - -// TODO 电流互感器不单独划分间隔,以母线、浇筑母线、变压器为间隔原件 -func IntervalBoundaryDetermine(uuid uuid.UUID) bool { - diagram.GetComponentMap(uuid.String()) - // TODO 判断 component 的类型是否为间隔 - // TODO 0xA1B2C3D4,高四位表示可以成为间隔的compoent类型的值为FFFF,普通 component 类型的值为 0000。低四位中前二位表示component的一级类型,例如母线 PT、母联/母分、进线等,低四位中后二位表示一级类型中包含的具体类型,例如母线 PT中包含的电压互感器、隔离开关、接地开关、避雷器、带电显示器等。 - num := uint32(0xA1B2C3D4) // 八位16进制数 - high16 := uint16(num >> 16) - fmt.Printf("原始值: 0x%X\n", num) // 输出: 0xA1B2C3D4 - fmt.Printf("高十六位: 0x%X\n", high16) // 输出: 0xA1B2 - return true -} - -// BuildMultiBranchTree return the multi branch tree by topologic info and component type map -func BuildMultiBranchTree(topologics []orm.Topologic) (*diagram.MultiBranchTreeNode, error) { +// BuildMultiBranchTree return the multi branch tree by topologic info. +// Returns the root node and a flat nodeMap for O(1) lookup by UUID. +func BuildMultiBranchTree(topologics []orm.Topologic) (*diagram.MultiBranchTreeNode, map[uuid.UUID]*diagram.MultiBranchTreeNode, error) { nodeMap := make(map[uuid.UUID]*diagram.MultiBranchTreeNode, len(topologics)*2) for _, topo := range topologics { if _, exists := nodeMap[topo.UUIDFrom]; !exists { - // skip special uuid - if topo.UUIDTo != constants.UUIDNil { + // UUIDNil is the virtual root sentinel — skip creating a regular node for it + if topo.UUIDFrom != constants.UUIDNil { nodeMap[topo.UUIDFrom] = &diagram.MultiBranchTreeNode{ ID: topo.UUIDFrom, Children: make([]*diagram.MultiBranchTreeNode, 0), @@ -105,7 +85,6 @@ func BuildMultiBranchTree(topologics []orm.Topologic) (*diagram.MultiBranchTreeN } if _, exists := nodeMap[topo.UUIDTo]; !exists { - // skip special uuid if topo.UUIDTo != constants.UUIDNil { nodeMap[topo.UUIDTo] = &diagram.MultiBranchTreeNode{ ID: topo.UUIDTo, @@ -118,10 +97,13 @@ func BuildMultiBranchTree(topologics []orm.Topologic) (*diagram.MultiBranchTreeN for _, topo := range topologics { var parent *diagram.MultiBranchTreeNode if topo.UUIDFrom == constants.UUIDNil { - parent = &diagram.MultiBranchTreeNode{ - ID: constants.UUIDNil, + if _, exists := nodeMap[constants.UUIDNil]; !exists { + nodeMap[constants.UUIDNil] = &diagram.MultiBranchTreeNode{ + ID: constants.UUIDNil, + Children: make([]*diagram.MultiBranchTreeNode, 0), + } } - nodeMap[constants.UUIDNil] = parent + parent = nodeMap[constants.UUIDNil] } else { parent = nodeMap[topo.UUIDFrom] } @@ -141,7 +123,7 @@ func BuildMultiBranchTree(topologics []orm.Topologic) (*diagram.MultiBranchTreeN // return root vertex root, exists := nodeMap[constants.UUIDNil] if !exists { - return nil, fmt.Errorf("root node not found") + return nil, nil, fmt.Errorf("root node not found") } - return root, nil + return root, nodeMap, nil } diff --git a/database/update_component.go b/database/update_component.go index 7957bc8..e08eea9 100644 --- a/database/update_component.go +++ b/database/update_component.go @@ -43,7 +43,7 @@ func UpdateComponentIntoDB(ctx context.Context, tx *gorm.DB, componentInfo netwo Name: componentInfo.Name, Context: componentInfo.Context, Op: componentInfo.Op, - Ts: time.Now(), + TS: time.Now(), } result = tx.Model(&orm.Component{}).WithContext(cancelCtx).Where("GLOBAL_UUID = ?", component.GlobalUUID).Updates(&updateParams) diff --git a/deploy/jaeger.yaml b/deploy/jaeger.yaml new file mode 100644 index 0000000..8dac477 --- /dev/null +++ b/deploy/jaeger.yaml @@ -0,0 +1,60 @@ +apiVersion: v1 +kind: Service +metadata: + name: jaeger + labels: + app: jaeger +spec: + ports: + - name: ui + port: 16686 + targetPort: 16686 + nodePort: 31686 # Jaeger UI,浏览器访问 http://:31686 + - name: collector-http + port: 14268 + targetPort: 14268 + nodePort: 31268 # Jaeger 原生 HTTP collector(非 OTel) + - name: otlp-http + port: 4318 + targetPort: 4318 + nodePort: 31318 # OTLP HTTP,集群外使用 :31318 + - name: otlp-grpc + port: 4317 + targetPort: 4317 + nodePort: 31317 # OTLP gRPC,集群外使用 :31317 + selector: + app: jaeger + type: NodePort +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: jaeger +spec: + replicas: 1 + selector: + matchLabels: + app: jaeger + template: + metadata: + labels: + app: jaeger + spec: + containers: + - name: jaeger + image: jaegertracing/all-in-one:1.56 + env: + - name: COLLECTOR_OTLP_ENABLED + value: "true" + ports: + - containerPort: 16686 # UI + - containerPort: 14268 # Jaeger Collector + - containerPort: 4317 # OTLP gRPC + - containerPort: 4318 # OTLP HTTP + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 128Mi diff --git a/diagram/anchor_set.go b/diagram/anchor_set.go index 94a83f4..a6447ca 100644 --- a/diagram/anchor_set.go +++ b/diagram/anchor_set.go @@ -1,3 +1,4 @@ +// Package diagram provide diagram data structure and operation package diagram import ( @@ -31,11 +32,9 @@ func UpdateAnchorValue(componentUUID string, anchorValue string) bool { // StoreAnchorValue define func of store anchor value with componentUUID and anchor name func StoreAnchorValue(componentUUID string, anchorValue string) { anchorValueOverview.Store(componentUUID, anchorValue) - return } // DeleteAnchorValue define func of delete anchor value with componentUUID func DeleteAnchorValue(componentUUID string) { anchorValueOverview.Delete(componentUUID) - return } diff --git a/diagram/component_set.go b/diagram/component_set.go index 7a7b6c5..da9bddf 100644 --- a/diagram/component_set.go +++ b/diagram/component_set.go @@ -1,3 +1,4 @@ +// Package diagram provide diagram data structure and operation package diagram import ( @@ -33,11 +34,9 @@ func UpdateComponentMap(componentID int64, componentInfo *orm.Component) bool { // StoreComponentMap define func of store circuit diagram data with component uuid and component info func StoreComponentMap(componentUUID string, componentInfo *orm.Component) { diagramsOverview.Store(componentUUID, componentInfo) - return } // DeleteComponentMap define func of delete circuit diagram data with component uuid func DeleteComponentMap(componentUUID string) { diagramsOverview.Delete(componentUUID) - return } diff --git a/diagram/hash_test.go b/diagram/hash_test.go index ed320f3..5ba91fd 100644 --- a/diagram/hash_test.go +++ b/diagram/hash_test.go @@ -1,3 +1,4 @@ +// Package diagram provide diagram data structure and operation package diagram import ( @@ -29,5 +30,4 @@ func TestHMSet(t *testing.T) { fmt.Printf("err:%v\n", err) } fmt.Printf("res:%v\n", res) - return } diff --git a/diagram/multi_branch_tree.go b/diagram/multi_branch_tree.go index ceb6cfa..d88393b 100644 --- a/diagram/multi_branch_tree.go +++ b/diagram/multi_branch_tree.go @@ -1,3 +1,4 @@ +// Package diagram provide diagram data structure and operation package diagram import ( @@ -62,3 +63,63 @@ func (n *MultiBranchTreeNode) PrintTree(level int) { child.PrintTree(level + 1) } } + +// FindPath returns the ordered node sequence from startID to endID using the +// supplied nodeMap for O(1) lookup. It walks each node up to the root to find +// the LCA, then stitches the two half-paths together. +// Returns nil when either node is absent from nodeMap or no path exists. +func FindPath(startID, endID uuid.UUID, nodeMap map[uuid.UUID]*MultiBranchTreeNode) []*MultiBranchTreeNode { + startNode, ok := nodeMap[startID] + if !ok { + return nil + } + endNode, ok := nodeMap[endID] + if !ok { + return nil + } + + // collect ancestors (inclusive) from a node up to the root sentinel + ancestors := func(n *MultiBranchTreeNode) []*MultiBranchTreeNode { + var chain []*MultiBranchTreeNode + for n != nil { + chain = append(chain, n) + n = n.Parent + } + return chain + } + + startChain := ancestors(startNode) // [start, ..., root] + endChain := ancestors(endNode) // [end, ..., root] + + // index startChain by ID for fast LCA detection + startIdx := make(map[uuid.UUID]int, len(startChain)) + for i, node := range startChain { + startIdx[node.ID] = i + } + + // find LCA: first node in endChain that also appears in startChain + lcaEndPos := -1 + lcaStartPos := -1 + for i, node := range endChain { + if j, found := startIdx[node.ID]; found { + lcaEndPos = i + lcaStartPos = j + break + } + } + + if lcaEndPos < 0 { + return nil // disconnected + } + + // path = startChain[0..lcaStartPos] reversed + endChain[lcaEndPos..0] reversed + path := make([]*MultiBranchTreeNode, 0, lcaStartPos+lcaEndPos+1) + for i := 0; i <= lcaStartPos; i++ { + path = append(path, startChain[i]) + } + // append end-side (skip LCA to avoid duplication), reversed + for i := lcaEndPos - 1; i >= 0; i-- { + path = append(path, endChain[i]) + } + return path +} diff --git a/diagram/topologic_set.go b/diagram/topologic_set.go index 9dbbec3..607584f 100644 --- a/diagram/topologic_set.go +++ b/diagram/topologic_set.go @@ -1,3 +1,4 @@ +// Package diagram provide diagram data structure and operation package diagram import ( @@ -39,11 +40,9 @@ func UpdateGrapMap(pageID int64, graphInfo *Graph) bool { // StoreGraphMap define func of store circuit diagram topologic data with pageID and topologic info func StoreGraphMap(pageID int64, graphInfo *Graph) { graphOverview.Store(pageID, graphInfo) - return } // DeleteGraphMap define func of delete circuit diagram topologic data with pageID func DeleteGraphMap(pageID int64) { graphOverview.Delete(pageID) - return } diff --git a/docs/docs.go b/docs/docs.go index ab3d020..f674f98 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -102,13 +102,12 @@ const docTemplate = `{ "summary": "测量点推荐(搜索框自动补全)", "parameters": [ { - "description": "查询输入参数,例如 'trans' 或 'transformfeeder1_220.'", - "name": "request", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/network.MeasurementRecommendRequest" - } + "type": "string", + "example": "\"grid1\"", + "description": "推荐关键词,例如 'grid1' 或 'grid1.'", + "name": "input", + "in": "query", + "required": true } ], "responses": { @@ -176,19 +175,400 @@ const docTemplate = `{ } } } + }, + "/monitors/data/realtime/stream/:clientID": { + "get": { + "description": "根据用户输入的clientID拉取对应的实时数据", + "tags": [ + "RealTime Component Websocket" + ], + "summary": "实时数据拉取 websocket api", + "responses": {} + } + }, + "/monitors/data/subscriptions": { + "post": { + "description": "根据用户输入的组件token,从 modelRT 服务中开始或结束对于量测节点的实时数据的订阅", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "RealTime Component" + ], + "summary": "开始或结束订阅实时数据", + "parameters": [ + { + "description": "量测节点实时数据订阅", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/network.RealTimeSubRequest" + } + } + ], + "responses": { + "2000": { + "description": "订阅实时数据结果列表", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.RealTimeSubPayload" + } + } + } + ] + } + }, + "3000": { + "description": "订阅实时数据结果列表", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.FailureResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.RealTimeSubPayload" + } + } + } + ] + } + } + } + } + }, + "/task/async": { + "post": { + "description": "创建新的异步任务并返回任务ID,任务将被提交到队列等待处理", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "AsyncTask" + ], + "summary": "创建异步任务", + "parameters": [ + { + "description": "任务创建请求", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/network.AsyncTaskCreateRequest" + } + } + ], + "responses": { + "200": { + "description": "任务创建成功", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.AsyncTaskCreateResponse" + } + } + } + ] + } + }, + "400": { + "description": "请求参数错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "500": { + "description": "服务器内部错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } + }, + "/task/async/results": { + "get": { + "description": "根据任务ID列表查询异步任务的状态和结果", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "AsyncTask" + ], + "summary": "查询异步任务结果", + "parameters": [ + { + "type": "string", + "description": "任务ID列表,用逗号分隔", + "name": "task_ids", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "查询成功", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.AsyncTaskResultQueryResponse" + } + } + } + ] + } + }, + "400": { + "description": "请求参数错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "500": { + "description": "服务器内部错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } + }, + "/task/async/{task_id}": { + "get": { + "description": "根据任务ID查询异步任务的详细状态和结果", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "AsyncTask" + ], + "summary": "查询异步任务详情", + "parameters": [ + { + "type": "string", + "description": "任务ID", + "name": "task_id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "查询成功", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.AsyncTaskResult" + } + } + } + ] + } + }, + "400": { + "description": "请求参数错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "404": { + "description": "任务不存在", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "500": { + "description": "服务器内部错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } + }, + "/task/async/{task_id}/cancel": { + "post": { + "description": "取消指定ID的异步任务(如果任务尚未开始执行)", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "AsyncTask" + ], + "summary": "取消异步任务", + "parameters": [ + { + "type": "string", + "description": "任务ID", + "name": "task_id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "任务取消成功", + "schema": { + "$ref": "#/definitions/network.SuccessResponse" + } + }, + "400": { + "description": "请求参数错误或任务无法取消", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "404": { + "description": "任务不存在", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "500": { + "description": "服务器内部错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } } }, "definitions": { + "network.AsyncTaskCreateRequest": { + "type": "object", + "properties": { + "params": { + "description": "required: true", + "type": "object" + }, + "task_type": { + "description": "required: true\nenum: TOPOLOGY_ANALYSIS, PERFORMANCE_ANALYSIS, EVENT_ANALYSIS, BATCH_IMPORT", + "type": "string", + "example": "TOPOLOGY_ANALYSIS" + } + } + }, + "network.AsyncTaskCreateResponse": { + "type": "object", + "properties": { + "task_id": { + "type": "string", + "example": "123e4567-e89b-12d3-a456-426614174000" + } + } + }, + "network.AsyncTaskResult": { + "type": "object", + "properties": { + "created_at": { + "type": "integer", + "example": 1741846200 + }, + "error_code": { + "type": "integer", + "example": 400102 + }, + "error_detail": { + "type": "object" + }, + "error_message": { + "type": "string", + "example": "Component UUID not found" + }, + "finished_at": { + "type": "integer", + "example": 1741846205 + }, + "progress": { + "type": "integer", + "example": 65 + }, + "result": { + "type": "object" + }, + "status": { + "type": "string", + "example": "COMPLETED" + }, + "task_id": { + "type": "string", + "example": "123e4567-e89b-12d3-a456-426614174000" + }, + "task_type": { + "type": "string", + "example": "TOPOLOGY_ANALYSIS" + } + } + }, + "network.AsyncTaskResultQueryResponse": { + "type": "object", + "properties": { + "tasks": { + "type": "array", + "items": { + "$ref": "#/definitions/network.AsyncTaskResult" + } + }, + "total": { + "type": "integer", + "example": 3 + } + } + }, "network.FailureResponse": { "type": "object", "properties": { "code": { "type": "integer", - "example": 500 + "example": 3000 }, "msg": { "type": "string", - "example": "failed to get recommend data from redis" + "example": "process completed with partial failures" }, "payload": { "type": "object" @@ -216,15 +596,10 @@ const docTemplate = `{ " \"I_B_rms\"", "\"I_C_rms\"]" ] - } - } - }, - "network.MeasurementRecommendRequest": { - "type": "object", - "properties": { - "input": { + }, + "recommended_type": { "type": "string", - "example": "trans" + "example": "grid_tag" } } }, @@ -237,21 +612,93 @@ const docTemplate = `{ } } }, + "network.RealTimeMeasurementItem": { + "type": "object", + "properties": { + "interval": { + "type": "string", + "example": "1" + }, + "targets": { + "type": "array", + "items": { + "type": "string" + }, + "example": [ + "[\"grid1.zone1.station1.ns1.tag1.bay.I11_A_rms\"", + "\"grid1.zone1.station1.ns1.tag1.tag1.bay.I11_B_rms\"]" + ] + } + } + }, + "network.RealTimeSubPayload": { + "type": "object", + "properties": { + "client_id": { + "type": "string", + "example": "5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" + }, + "targets": { + "type": "array", + "items": { + "$ref": "#/definitions/network.TargetResult" + } + } + } + }, + "network.RealTimeSubRequest": { + "type": "object", + "properties": { + "action": { + "description": "required: true\nenum: [start, stop]", + "type": "string", + "example": "start" + }, + "client_id": { + "type": "string", + "example": "5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" + }, + "measurements": { + "description": "required: true", + "type": "array", + "items": { + "$ref": "#/definitions/network.RealTimeMeasurementItem" + } + } + } + }, "network.SuccessResponse": { "type": "object", "properties": { "code": { "type": "integer", - "example": 200 + "example": 2000 }, "msg": { "type": "string", - "example": "success" + "example": "process completed" }, "payload": { "type": "object" } } + }, + "network.TargetResult": { + "type": "object", + "properties": { + "code": { + "type": "integer", + "example": 20000 + }, + "id": { + "type": "string", + "example": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms" + }, + "msg": { + "type": "string", + "example": "subscription success" + } + } } } }` diff --git a/docs/swagger.json b/docs/swagger.json index 92f20fa..c6bb033 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -96,13 +96,12 @@ "summary": "测量点推荐(搜索框自动补全)", "parameters": [ { - "description": "查询输入参数,例如 'trans' 或 'transformfeeder1_220.'", - "name": "request", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/network.MeasurementRecommendRequest" - } + "type": "string", + "example": "\"grid1\"", + "description": "推荐关键词,例如 'grid1' 或 'grid1.'", + "name": "input", + "in": "query", + "required": true } ], "responses": { @@ -170,19 +169,400 @@ } } } + }, + "/monitors/data/realtime/stream/:clientID": { + "get": { + "description": "根据用户输入的clientID拉取对应的实时数据", + "tags": [ + "RealTime Component Websocket" + ], + "summary": "实时数据拉取 websocket api", + "responses": {} + } + }, + "/monitors/data/subscriptions": { + "post": { + "description": "根据用户输入的组件token,从 modelRT 服务中开始或结束对于量测节点的实时数据的订阅", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "RealTime Component" + ], + "summary": "开始或结束订阅实时数据", + "parameters": [ + { + "description": "量测节点实时数据订阅", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/network.RealTimeSubRequest" + } + } + ], + "responses": { + "2000": { + "description": "订阅实时数据结果列表", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.RealTimeSubPayload" + } + } + } + ] + } + }, + "3000": { + "description": "订阅实时数据结果列表", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.FailureResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.RealTimeSubPayload" + } + } + } + ] + } + } + } + } + }, + "/task/async": { + "post": { + "description": "创建新的异步任务并返回任务ID,任务将被提交到队列等待处理", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "AsyncTask" + ], + "summary": "创建异步任务", + "parameters": [ + { + "description": "任务创建请求", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/network.AsyncTaskCreateRequest" + } + } + ], + "responses": { + "200": { + "description": "任务创建成功", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.AsyncTaskCreateResponse" + } + } + } + ] + } + }, + "400": { + "description": "请求参数错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "500": { + "description": "服务器内部错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } + }, + "/task/async/results": { + "get": { + "description": "根据任务ID列表查询异步任务的状态和结果", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "AsyncTask" + ], + "summary": "查询异步任务结果", + "parameters": [ + { + "type": "string", + "description": "任务ID列表,用逗号分隔", + "name": "task_ids", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "查询成功", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.AsyncTaskResultQueryResponse" + } + } + } + ] + } + }, + "400": { + "description": "请求参数错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "500": { + "description": "服务器内部错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } + }, + "/task/async/{task_id}": { + "get": { + "description": "根据任务ID查询异步任务的详细状态和结果", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "AsyncTask" + ], + "summary": "查询异步任务详情", + "parameters": [ + { + "type": "string", + "description": "任务ID", + "name": "task_id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "查询成功", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/network.SuccessResponse" + }, + { + "type": "object", + "properties": { + "payload": { + "$ref": "#/definitions/network.AsyncTaskResult" + } + } + } + ] + } + }, + "400": { + "description": "请求参数错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "404": { + "description": "任务不存在", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "500": { + "description": "服务器内部错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } + }, + "/task/async/{task_id}/cancel": { + "post": { + "description": "取消指定ID的异步任务(如果任务尚未开始执行)", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "AsyncTask" + ], + "summary": "取消异步任务", + "parameters": [ + { + "type": "string", + "description": "任务ID", + "name": "task_id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "任务取消成功", + "schema": { + "$ref": "#/definitions/network.SuccessResponse" + } + }, + "400": { + "description": "请求参数错误或任务无法取消", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "404": { + "description": "任务不存在", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + }, + "500": { + "description": "服务器内部错误", + "schema": { + "$ref": "#/definitions/network.FailureResponse" + } + } + } + } } }, "definitions": { + "network.AsyncTaskCreateRequest": { + "type": "object", + "properties": { + "params": { + "description": "required: true", + "type": "object" + }, + "task_type": { + "description": "required: true\nenum: TOPOLOGY_ANALYSIS, PERFORMANCE_ANALYSIS, EVENT_ANALYSIS, BATCH_IMPORT", + "type": "string", + "example": "TOPOLOGY_ANALYSIS" + } + } + }, + "network.AsyncTaskCreateResponse": { + "type": "object", + "properties": { + "task_id": { + "type": "string", + "example": "123e4567-e89b-12d3-a456-426614174000" + } + } + }, + "network.AsyncTaskResult": { + "type": "object", + "properties": { + "created_at": { + "type": "integer", + "example": 1741846200 + }, + "error_code": { + "type": "integer", + "example": 400102 + }, + "error_detail": { + "type": "object" + }, + "error_message": { + "type": "string", + "example": "Component UUID not found" + }, + "finished_at": { + "type": "integer", + "example": 1741846205 + }, + "progress": { + "type": "integer", + "example": 65 + }, + "result": { + "type": "object" + }, + "status": { + "type": "string", + "example": "COMPLETED" + }, + "task_id": { + "type": "string", + "example": "123e4567-e89b-12d3-a456-426614174000" + }, + "task_type": { + "type": "string", + "example": "TOPOLOGY_ANALYSIS" + } + } + }, + "network.AsyncTaskResultQueryResponse": { + "type": "object", + "properties": { + "tasks": { + "type": "array", + "items": { + "$ref": "#/definitions/network.AsyncTaskResult" + } + }, + "total": { + "type": "integer", + "example": 3 + } + } + }, "network.FailureResponse": { "type": "object", "properties": { "code": { "type": "integer", - "example": 500 + "example": 3000 }, "msg": { "type": "string", - "example": "failed to get recommend data from redis" + "example": "process completed with partial failures" }, "payload": { "type": "object" @@ -210,15 +590,10 @@ " \"I_B_rms\"", "\"I_C_rms\"]" ] - } - } - }, - "network.MeasurementRecommendRequest": { - "type": "object", - "properties": { - "input": { + }, + "recommended_type": { "type": "string", - "example": "trans" + "example": "grid_tag" } } }, @@ -231,21 +606,93 @@ } } }, + "network.RealTimeMeasurementItem": { + "type": "object", + "properties": { + "interval": { + "type": "string", + "example": "1" + }, + "targets": { + "type": "array", + "items": { + "type": "string" + }, + "example": [ + "[\"grid1.zone1.station1.ns1.tag1.bay.I11_A_rms\"", + "\"grid1.zone1.station1.ns1.tag1.tag1.bay.I11_B_rms\"]" + ] + } + } + }, + "network.RealTimeSubPayload": { + "type": "object", + "properties": { + "client_id": { + "type": "string", + "example": "5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" + }, + "targets": { + "type": "array", + "items": { + "$ref": "#/definitions/network.TargetResult" + } + } + } + }, + "network.RealTimeSubRequest": { + "type": "object", + "properties": { + "action": { + "description": "required: true\nenum: [start, stop]", + "type": "string", + "example": "start" + }, + "client_id": { + "type": "string", + "example": "5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" + }, + "measurements": { + "description": "required: true", + "type": "array", + "items": { + "$ref": "#/definitions/network.RealTimeMeasurementItem" + } + } + } + }, "network.SuccessResponse": { "type": "object", "properties": { "code": { "type": "integer", - "example": 200 + "example": 2000 }, "msg": { "type": "string", - "example": "success" + "example": "process completed" }, "payload": { "type": "object" } } + }, + "network.TargetResult": { + "type": "object", + "properties": { + "code": { + "type": "integer", + "example": 20000 + }, + "id": { + "type": "string", + "example": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms" + }, + "msg": { + "type": "string", + "example": "subscription success" + } + } } } } \ No newline at end of file diff --git a/docs/swagger.yaml b/docs/swagger.yaml index e540f45..598c8d9 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1,12 +1,71 @@ basePath: /api/v1 definitions: + network.AsyncTaskCreateRequest: + properties: + params: + description: 'required: true' + type: object + task_type: + description: |- + required: true + enum: TOPOLOGY_ANALYSIS, PERFORMANCE_ANALYSIS, EVENT_ANALYSIS, BATCH_IMPORT + example: TOPOLOGY_ANALYSIS + type: string + type: object + network.AsyncTaskCreateResponse: + properties: + task_id: + example: 123e4567-e89b-12d3-a456-426614174000 + type: string + type: object + network.AsyncTaskResult: + properties: + created_at: + example: 1741846200 + type: integer + error_code: + example: 400102 + type: integer + error_detail: + type: object + error_message: + example: Component UUID not found + type: string + finished_at: + example: 1741846205 + type: integer + progress: + example: 65 + type: integer + result: + type: object + status: + example: COMPLETED + type: string + task_id: + example: 123e4567-e89b-12d3-a456-426614174000 + type: string + task_type: + example: TOPOLOGY_ANALYSIS + type: string + type: object + network.AsyncTaskResultQueryResponse: + properties: + tasks: + items: + $ref: '#/definitions/network.AsyncTaskResult' + type: array + total: + example: 3 + type: integer + type: object network.FailureResponse: properties: code: - example: 500 + example: 3000 type: integer msg: - example: failed to get recommend data from redis + example: process completed with partial failures type: string payload: type: object @@ -27,11 +86,8 @@ definitions: items: type: string type: array - type: object - network.MeasurementRecommendRequest: - properties: - input: - example: trans + recommended_type: + example: grid_tag type: string type: object network.RealTimeDataPayload: @@ -40,17 +96,69 @@ definitions: description: TODO 增加example tag type: object type: object + network.RealTimeMeasurementItem: + properties: + interval: + example: "1" + type: string + targets: + example: + - '["grid1.zone1.station1.ns1.tag1.bay.I11_A_rms"' + - '"grid1.zone1.station1.ns1.tag1.tag1.bay.I11_B_rms"]' + items: + type: string + type: array + type: object + network.RealTimeSubPayload: + properties: + client_id: + example: 5d72f2d9-e33a-4f1b-9c76-88a44b9a953e + type: string + targets: + items: + $ref: '#/definitions/network.TargetResult' + type: array + type: object + network.RealTimeSubRequest: + properties: + action: + description: |- + required: true + enum: [start, stop] + example: start + type: string + client_id: + example: 5d72f2d9-e33a-4f1b-9c76-88a44b9a953e + type: string + measurements: + description: 'required: true' + items: + $ref: '#/definitions/network.RealTimeMeasurementItem' + type: array + type: object network.SuccessResponse: properties: code: - example: 200 + example: 2000 type: integer msg: - example: success + example: process completed type: string payload: type: object type: object + network.TargetResult: + properties: + code: + example: 20000 + type: integer + id: + example: grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms + type: string + msg: + example: subscription success + type: string + type: object host: localhost:8080 info: contact: @@ -110,12 +218,12 @@ paths: - application/json description: 根据用户输入的字符串,从 Redis 中查询可能的测量点或结构路径,并提供推荐列表。 parameters: - - description: 查询输入参数,例如 'trans' 或 'transformfeeder1_220.' - in: body - name: request + - description: 推荐关键词,例如 'grid1' 或 'grid1.' + example: '"grid1"' + in: query + name: input required: true - schema: - $ref: '#/definitions/network.MeasurementRecommendRequest' + type: string produces: - application/json responses: @@ -160,4 +268,187 @@ paths: summary: load circuit diagram info tags: - load circuit_diagram + /monitors/data/realtime/stream/:clientID: + get: + description: 根据用户输入的clientID拉取对应的实时数据 + responses: {} + summary: 实时数据拉取 websocket api + tags: + - RealTime Component Websocket + /monitors/data/subscriptions: + post: + consumes: + - application/json + description: 根据用户输入的组件token,从 modelRT 服务中开始或结束对于量测节点的实时数据的订阅 + parameters: + - description: 量测节点实时数据订阅 + in: body + name: request + required: true + schema: + $ref: '#/definitions/network.RealTimeSubRequest' + produces: + - application/json + responses: + "2000": + description: 订阅实时数据结果列表 + schema: + allOf: + - $ref: '#/definitions/network.SuccessResponse' + - properties: + payload: + $ref: '#/definitions/network.RealTimeSubPayload' + type: object + "3000": + description: 订阅实时数据结果列表 + schema: + allOf: + - $ref: '#/definitions/network.FailureResponse' + - properties: + payload: + $ref: '#/definitions/network.RealTimeSubPayload' + type: object + summary: 开始或结束订阅实时数据 + tags: + - RealTime Component + /task/async: + post: + consumes: + - application/json + description: 创建新的异步任务并返回任务ID,任务将被提交到队列等待处理 + parameters: + - description: 任务创建请求 + in: body + name: request + required: true + schema: + $ref: '#/definitions/network.AsyncTaskCreateRequest' + produces: + - application/json + responses: + "200": + description: 任务创建成功 + schema: + allOf: + - $ref: '#/definitions/network.SuccessResponse' + - properties: + payload: + $ref: '#/definitions/network.AsyncTaskCreateResponse' + type: object + "400": + description: 请求参数错误 + schema: + $ref: '#/definitions/network.FailureResponse' + "500": + description: 服务器内部错误 + schema: + $ref: '#/definitions/network.FailureResponse' + summary: 创建异步任务 + tags: + - AsyncTask + /task/async/{task_id}: + get: + consumes: + - application/json + description: 根据任务ID查询异步任务的详细状态和结果 + parameters: + - description: 任务ID + in: path + name: task_id + required: true + type: string + produces: + - application/json + responses: + "200": + description: 查询成功 + schema: + allOf: + - $ref: '#/definitions/network.SuccessResponse' + - properties: + payload: + $ref: '#/definitions/network.AsyncTaskResult' + type: object + "400": + description: 请求参数错误 + schema: + $ref: '#/definitions/network.FailureResponse' + "404": + description: 任务不存在 + schema: + $ref: '#/definitions/network.FailureResponse' + "500": + description: 服务器内部错误 + schema: + $ref: '#/definitions/network.FailureResponse' + summary: 查询异步任务详情 + tags: + - AsyncTask + /task/async/{task_id}/cancel: + post: + consumes: + - application/json + description: 取消指定ID的异步任务(如果任务尚未开始执行) + parameters: + - description: 任务ID + in: path + name: task_id + required: true + type: string + produces: + - application/json + responses: + "200": + description: 任务取消成功 + schema: + $ref: '#/definitions/network.SuccessResponse' + "400": + description: 请求参数错误或任务无法取消 + schema: + $ref: '#/definitions/network.FailureResponse' + "404": + description: 任务不存在 + schema: + $ref: '#/definitions/network.FailureResponse' + "500": + description: 服务器内部错误 + schema: + $ref: '#/definitions/network.FailureResponse' + summary: 取消异步任务 + tags: + - AsyncTask + /task/async/results: + get: + consumes: + - application/json + description: 根据任务ID列表查询异步任务的状态和结果 + parameters: + - description: 任务ID列表,用逗号分隔 + in: query + name: task_ids + required: true + type: string + produces: + - application/json + responses: + "200": + description: 查询成功 + schema: + allOf: + - $ref: '#/definitions/network.SuccessResponse' + - properties: + payload: + $ref: '#/definitions/network.AsyncTaskResultQueryResponse' + type: object + "400": + description: 请求参数错误 + schema: + $ref: '#/definitions/network.FailureResponse' + "500": + description: 服务器内部错误 + schema: + $ref: '#/definitions/network.FailureResponse' + summary: 查询异步任务结果 + tags: + - AsyncTask swagger: "2.0" diff --git a/go.mod b/go.mod index 40347c5..a815516 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module modelRT -go 1.24 +go 1.25.0 require ( github.com/DATA-DOG/go-sqlmock v1.5.2 @@ -17,11 +17,16 @@ require ( github.com/rabbitmq/amqp091-go v1.10.0 github.com/redis/go-redis/v9 v9.7.3 github.com/spf13/viper v1.19.0 - github.com/stretchr/testify v1.10.0 + github.com/stretchr/testify v1.11.1 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.0 github.com/swaggo/swag v1.16.4 github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 + go.opentelemetry.io/contrib/propagators/b3 v1.43.0 + go.opentelemetry.io/otel v1.43.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 + go.opentelemetry.io/otel/sdk v1.43.0 + go.opentelemetry.io/otel/trace v1.43.0 go.uber.org/zap v1.27.0 gorm.io/driver/mysql v1.5.7 gorm.io/driver/postgres v1.5.9 @@ -33,13 +38,16 @@ require ( github.com/KyleBanks/depth v1.2.1 // indirect github.com/bytedance/sonic v1.13.3 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.9 // indirect github.com/gin-contrib/sse v1.1.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/spec v0.21.0 // indirect @@ -49,6 +57,8 @@ require ( github.com/go-playground/validator/v10 v10.26.0 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/goccy/go-json v0.10.5 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect @@ -76,16 +86,23 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.3.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect + go.opentelemetry.io/otel/metric v1.43.0 // indirect + go.opentelemetry.io/proto/otlp v1.10.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/arch v0.18.0 // indirect - golang.org/x/crypto v0.39.0 // indirect + golang.org/x/crypto v0.49.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect - golang.org/x/net v0.41.0 // indirect - golang.org/x/sync v0.15.0 // indirect - golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.26.0 // indirect - golang.org/x/tools v0.33.0 // indirect - google.golang.org/protobuf v1.36.6 // indirect + golang.org/x/net v0.52.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/sys v0.42.0 // indirect + golang.org/x/text v0.35.0 // indirect + golang.org/x/tools v0.42.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect + google.golang.org/grpc v1.80.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 3b5290c..97e2fcf 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,10 @@ github.com/bytedance/sonic v1.13.3/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1 github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY= github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= @@ -42,6 +44,11 @@ github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM= github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ= github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= @@ -64,13 +71,19 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -126,8 +139,8 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzuk github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= @@ -151,8 +164,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE= @@ -168,6 +181,26 @@ github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2W github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/propagators/b3 v1.43.0 h1:CETqV3QLLPTy5yNrqyMr41VnAOOD4lsRved7n4QG00A= +go.opentelemetry.io/contrib/propagators/b3 v1.43.0/go.mod h1:Q4mCiCdziYzpNR0g+6UqVotAlCDZdzz6L8jwY4knOrw= +go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= +go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 h1:3iZJKlCZufyRzPzlQhUIWVmfltrXuGyfjREgGP3UUjc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0/go.mod h1:/G+nUPfhq2e+qiXMGxMwumDrP5jtzU+mWN7/sjT2rak= +go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= +go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= +go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= +go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= +go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw= +go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= +go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= +go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= +go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g= +go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= @@ -178,24 +211,24 @@ golang.org/x/arch v0.18.0 h1:WN9poc33zL4AzGxqf8VtpKUnGvMi8O9lhNyBMF/85qc= golang.org/x/arch v0.18.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= -golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= +golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= -golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= -golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -203,8 +236,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -212,16 +245,24 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= -golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc= -golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= -google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA= +google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/handler/async_task_create_handler.go b/handler/async_task_create_handler.go index 6bf8a72..592460b 100644 --- a/handler/async_task_create_handler.go +++ b/handler/async_task_create_handler.go @@ -10,6 +10,8 @@ import ( "modelRT/task" "github.com/gin-gonic/gin" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) // AsyncTaskCreateHandler handles creation of asynchronous tasks @@ -67,13 +69,11 @@ func AsyncTaskCreateHandler(c *gin.Context) { // enqueue task to channel for async publishing to RabbitMQ msg := task.NewTaskQueueMessageWithPriority(asyncTask.TaskID, task.TaskType(request.TaskType), 5) - // propagate HTTP request trace so the async chain stays on the same traceID - if v, _ := ctx.Value(constants.CtxKeyTraceID).(string); v != "" { - msg.TraceID = v - } - if v, _ := ctx.Value(constants.CtxKeySpanID).(string); v != "" { - msg.SpanID = v - } + // propagate the current OTel span context so the async chain stays on the same trace + carrier := make(map[string]string) + otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier)) + msg.TraceCarrier = carrier + msg.Params = request.Params task.TaskMsgChan <- msg logger.Info(ctx, "task enqueued to channel", "task_id", asyncTask.TaskID, "queue", constants.TaskQueueName) @@ -102,13 +102,18 @@ func validateTaskParams(taskType string, params map[string]any) bool { } func validateTopologyAnalysisParams(params map[string]any) bool { - // Check required parameters for topology analysis - if startUUID, ok := params["start_uuid"]; !ok || startUUID == "" { + if v, ok := params["start_component_uuid"]; !ok || v == "" { return false } - if endUUID, ok := params["end_uuid"]; !ok || endUUID == "" { + if v, ok := params["end_component_uuid"]; !ok || v == "" { return false } + // check_in_service is optional; validate type when present + if v, exists := params["check_in_service"]; exists { + if _, isBool := v.(bool); !isBool { + return false + } + } return true } diff --git a/logger/logger.go b/logger/logger.go index 0d4317a..3b4175f 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -6,8 +6,7 @@ import ( "path" "runtime" - "modelRT/constants" - + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -22,9 +21,6 @@ type Logger interface { type logger struct { ctx context.Context - traceID string - spanID string - pSpanID string _logger *zap.Logger } @@ -56,10 +52,10 @@ func makeLogFields(ctx context.Context, kv ...any) []zap.Field { kv = append(kv, "unknown") } - traceID, _ := ctx.Value(constants.CtxKeyTraceID).(string) - spanID, _ := ctx.Value(constants.CtxKeySpanID).(string) - parentSpanID, _ := ctx.Value(constants.CtxKeyParentSpanID).(string) - kv = append(kv, "traceID", traceID, "spanID", spanID, "parentSpanID", parentSpanID) + spanCtx := trace.SpanFromContext(ctx).SpanContext() + traceID := spanCtx.TraceID().String() + spanID := spanCtx.SpanID().String() + kv = append(kv, "traceID", traceID, "spanID", spanID) funcName, file, line := getLoggerCallerInfo() kv = append(kv, "func", funcName, "file", file, "line", line) @@ -100,25 +96,11 @@ func getLoggerCallerInfo() (funcName, file string, line int) { return } -// New returns a logger bound to ctx. Trace fields (traceID, spanID, parentSpanID) -// are extracted from ctx using typed keys, and are included in every log entry. +// New returns a logger bound to ctx. Trace fields (traceID, spanID) are extracted +// from the OTel span stored in ctx and included in every log entry. func New(ctx context.Context) Logger { - var traceID, spanID, pSpanID string - if v, _ := ctx.Value(constants.CtxKeyTraceID).(string); v != "" { - traceID = v - } - if v, _ := ctx.Value(constants.CtxKeySpanID).(string); v != "" { - spanID = v - } - if v, _ := ctx.Value(constants.CtxKeyParentSpanID).(string); v != "" { - pSpanID = v - } - return &logger{ ctx: ctx, - traceID: traceID, - spanID: spanID, - pSpanID: pSpanID, _logger: GetLoggerInstance(), } } diff --git a/main.go b/main.go index 103eca9..4121a1f 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "modelRT/database" "modelRT/diagram" "modelRT/logger" + "modelRT/middleware" "modelRT/model" "modelRT/mq" "modelRT/pool" @@ -38,6 +39,7 @@ import ( "github.com/panjf2000/ants/v2" swaggerFiles "github.com/swaggo/files" ginSwagger "github.com/swaggo/gin-swagger" + "go.opentelemetry.io/otel" "gorm.io/gorm" ) @@ -73,9 +75,6 @@ var ( func main() { flag.Parse() - startupSpanID := util.GenerateSpanID("startup") - ctx := context.WithValue(context.Background(), constants.CtxKeyTraceID, startupSpanID) - ctx = context.WithValue(ctx, constants.CtxKeySpanID, startupSpanID) configPath := filepath.Join(*modelRTConfigDir, *modelRTConfigName+"."+*modelRTConfigType) if _, err := os.Stat(configPath); os.IsNotExist(err) { @@ -101,6 +100,22 @@ func main() { logger.InitLoggerInstance(modelRTConfig.LoggerConfig) defer logger.GetLoggerInstance().Sync() + // init OTel TracerProvider + tp, tpErr := middleware.InitTracerProvider(context.Background(), modelRTConfig) + if tpErr != nil { + log.Printf("warn: OTLP tracer init failed, tracing disabled: %v", tpErr) + } + if tp != nil { + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + tp.Shutdown(shutdownCtx) + }() + } + + ctx, startupSpan := otel.Tracer("modelRT/main").Start(context.Background(), "startup") + defer startupSpan.End() + hostName, err := os.Hostname() if err != nil { logger.Error(ctx, "get host name failed", "error", err) @@ -226,7 +241,7 @@ func main() { } go realtimedata.StartComputingRealTimeDataLimit(ctx, allMeasurement) - tree, err := database.QueryTopologicFromDB(ctx, tx) + tree, _, err := database.QueryTopologicFromDB(ctx, tx) if err != nil { logger.Error(ctx, "load topologic info from postgres failed", "error", err) panic(err) @@ -285,3 +300,4 @@ func main() { } } } + diff --git a/middleware/trace.go b/middleware/trace.go index 604a08c..9712de8 100644 --- a/middleware/trace.go +++ b/middleware/trace.go @@ -1,41 +1,93 @@ -// Package middleware define gin framework middlewares +// Package middleware defines gin framework middlewares and OTel tracing infrastructure. package middleware import ( "bytes" "context" + "fmt" "io" "strings" "time" + "modelRT/config" "modelRT/constants" "modelRT/logger" - "modelRT/util" "github.com/gin-gonic/gin" + "go.opentelemetry.io/contrib/propagators/b3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + sdkresource "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + oteltrace "go.opentelemetry.io/otel/trace" ) -// StartTrace define func of set trace info from request header -func StartTrace() gin.HandlerFunc { - return func(c *gin.Context) { - traceID := c.Request.Header.Get(constants.HeaderTraceID) - parentSpanID := c.Request.Header.Get(constants.HeaderSpanID) - spanID := util.GenerateSpanID(c.Request.RemoteAddr) - // if traceId is empty, it means it is the origin of the link. Set it to the spanId of this time. The originating spanId is the root spanId. - if traceID == "" { - // traceId identifies the entire request link, and spanId identifies the different services in the link. - traceID = spanID - } - c.Set(constants.HeaderTraceID, traceID) - c.Set(constants.HeaderSpanID, spanID) - c.Set(constants.HeaderParentSpanID, parentSpanID) +// InitTracerProvider creates an OTLP TracerProvider and registers it as the global provider. +// It also registers the B3 propagator to stay compatible with existing B3 infrastructure. +// The caller is responsible for calling Shutdown on the returned provider during graceful shutdown. +func InitTracerProvider(ctx context.Context, cfg config.ModelRTConfig) (*sdktrace.TracerProvider, error) { + opts := []otlptracehttp.Option{ + otlptracehttp.WithEndpoint(cfg.OtelConfig.Endpoint), + } + if cfg.OtelConfig.Insecure { + opts = append(opts, otlptracehttp.WithInsecure()) + } - // also inject into request context so c.Request.Context() carries trace values - reqCtx := c.Request.Context() - reqCtx = context.WithValue(reqCtx, constants.CtxKeyTraceID, traceID) - reqCtx = context.WithValue(reqCtx, constants.CtxKeySpanID, spanID) - reqCtx = context.WithValue(reqCtx, constants.CtxKeyParentSpanID, parentSpanID) - c.Request = c.Request.WithContext(reqCtx) + exporter, err := otlptracehttp.New(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("create OTLP exporter: %w", err) + } + + res := sdkresource.NewSchemaless( + attribute.String("service.name", cfg.ServiceName), + attribute.String("deployment.environment", cfg.DeployEnv), + ) + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(sdktrace.AlwaysSample()), + ) + + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(b3.New()) + + return tp, nil +} + +// StartTrace extracts upstream B3 trace context from request headers and starts a server span. +// Typed context keys are also injected for backward compatibility with the existing logger +// until the logger is migrated to read from the OTel span context (Step 6). +func StartTrace() gin.HandlerFunc { + tracer := otel.Tracer("modelRT/http") + return func(c *gin.Context) { + // Extract upstream trace context from B3 headers (X-B3-TraceId etc.) + ctx := otel.GetTextMapPropagator().Extract( + c.Request.Context(), + propagation.HeaderCarrier(c.Request.Header), + ) + + spanName := c.FullPath() + if spanName == "" { + spanName = c.Request.URL.Path + } + ctx, span := tracer.Start(ctx, spanName, + oteltrace.WithSpanKind(oteltrace.SpanKindServer), + ) + defer span.End() + + // backward compat: inject typed keys so existing logger reads work until Step 6 + spanCtx := span.SpanContext() + ctx = context.WithValue(ctx, constants.CtxKeyTraceID, spanCtx.TraceID().String()) + ctx = context.WithValue(ctx, constants.CtxKeySpanID, spanCtx.SpanID().String()) + + c.Request = c.Request.WithContext(ctx) + + // set in gin context for accessLog (logger.New(c) reads via gin.Context.Value) + c.Set(constants.HeaderTraceID, spanCtx.TraceID().String()) + c.Set(constants.HeaderSpanID, spanCtx.SpanID().String()) c.Next() } diff --git a/model/redis_recommend.go b/model/redis_recommend.go index e0ac507..ddfef2a 100644 --- a/model/redis_recommend.go +++ b/model/redis_recommend.go @@ -550,7 +550,6 @@ func handleLevelFuzzySearch(ctx context.Context, rdb *redis.Client, hierarchy co IsFuzzy: true, Err: nil, } - return } // runFuzzySearch define func to process redis fuzzy search diff --git a/network/async_task_request.go b/network/async_task_request.go index 9534b76..bf6a55a 100644 --- a/network/async_task_request.go +++ b/network/async_task_request.go @@ -62,8 +62,9 @@ type AsyncTaskStatusUpdate struct { // TopologyAnalysisParams defines the parameters for topology analysis task type TopologyAnalysisParams struct { - StartUUID string `json:"start_uuid" example:"comp-001" description:"起始元件UUID"` - EndUUID string `json:"end_uuid" example:"comp-999" description:"目标元件UUID"` + StartComponentUUID string `json:"start_component_uuid" example:"550e8400-e29b-41d4-a716-446655440000" description:"起始元件UUID"` + EndComponentUUID string `json:"end_component_uuid" example:"550e8400-e29b-41d4-a716-446655440001" description:"目标元件UUID"` + CheckInService bool `json:"check_in_service" example:"true" description:"是否检查路径上元件的投运状态,默认为true"` } // PerformanceAnalysisParams defines the parameters for performance analysis task diff --git a/network/circuit_diagram_update_request.go b/network/circuit_diagram_update_request.go index 86b1a76..76ed6c2 100644 --- a/network/circuit_diagram_update_request.go +++ b/network/circuit_diagram_update_request.go @@ -158,7 +158,7 @@ func ConvertComponentUpdateInfosToComponents(updateInfo ComponentUpdateInfo) (*o // Op: info.Op, // Tag: info.Tag, // 其他字段可根据需要补充 - Ts: time.Now(), + TS: time.Now(), } return component, nil } diff --git a/orm/async_motor.go b/orm/async_motor.go index 017e46e..48547ec 100644 --- a/orm/async_motor.go +++ b/orm/async_motor.go @@ -85,7 +85,6 @@ func (a *AsyncMotor) TableName() string { // SetComponentID func implement BasicModelInterface interface func (a *AsyncMotor) SetComponentID(componentID int64) { a.ComponentID = componentID - return } // ReturnTableName func implement BasicModelInterface interface diff --git a/orm/busbar_section.go b/orm/busbar_section.go index 764057f..d953d2f 100644 --- a/orm/busbar_section.go +++ b/orm/busbar_section.go @@ -72,7 +72,6 @@ func (b *BusbarSection) TableName() string { // SetComponentID func implement BasicModelInterface interface func (b *BusbarSection) SetComponentID(componentID int64) { b.ComponentID = componentID - return } // ReturnTableName func implement BasicModelInterface interface diff --git a/orm/circuit_diagram_bay.go b/orm/circuit_diagram_bay.go index 4554bc3..4396219 100644 --- a/orm/circuit_diagram_bay.go +++ b/orm/circuit_diagram_bay.go @@ -34,7 +34,7 @@ type Bay struct { DevEtc JSONMap `gorm:"column:dev_etc;type:jsonb;not null;default:'[]'"` Components []uuid.UUID `gorm:"column:components;type:uuid[];not null;default:'{}'"` Op int `gorm:"column:op;not null;default:-1"` - Ts time.Time `gorm:"column:ts;type:timestamptz;not null;default:CURRENT_TIMESTAMP"` + TS time.Time `gorm:"column:ts;type:timestamptz;not null;default:CURRENT_TIMESTAMP"` } // TableName func respresent return table name of Bay diff --git a/orm/circuit_diagram_component.go b/orm/circuit_diagram_component.go index df41bc5..a292c6a 100644 --- a/orm/circuit_diagram_component.go +++ b/orm/circuit_diagram_component.go @@ -27,7 +27,7 @@ type Component struct { Label JSONMap `gorm:"column:label;type:jsonb;not null;default:'{}'"` Context JSONMap `gorm:"column:context;type:jsonb;not null;default:'{}'"` Op int `gorm:"column:op;not null;default:-1"` - Ts time.Time `gorm:"column:ts;type:timestamptz;not null;default:current_timestamp;autoCreateTime"` + TS time.Time `gorm:"column:ts;type:timestamptz;not null;default:current_timestamp;autoCreateTime"` } // TableName func respresent return table name of Component diff --git a/orm/circuit_diagram_grid.go b/orm/circuit_diagram_grid.go index 43d90e5..86da8a8 100644 --- a/orm/circuit_diagram_grid.go +++ b/orm/circuit_diagram_grid.go @@ -12,7 +12,7 @@ type Grid struct { Name string `gorm:"column:name"` Description string `gorm:"column:description"` Op int `gorm:"column:op"` - Ts time.Time `gorm:"column:ts"` + TS time.Time `gorm:"column:ts"` } // TableName func respresent return table name of Grid diff --git a/orm/circuit_diagram_measurement.go b/orm/circuit_diagram_measurement.go index 4abc441..e281559 100644 --- a/orm/circuit_diagram_measurement.go +++ b/orm/circuit_diagram_measurement.go @@ -20,7 +20,7 @@ type Measurement struct { BayUUID uuid.UUID `gorm:"column:bay_uuid;type:uuid;not null"` ComponentUUID uuid.UUID `gorm:"column:component_uuid;type:uuid;not null"` Op int `gorm:"column:op;not null;default:-1"` - Ts time.Time `gorm:"column:ts;type:timestamptz;not null;default:CURRENT_TIMESTAMP"` + TS time.Time `gorm:"column:ts;type:timestamptz;not null;default:CURRENT_TIMESTAMP"` } // TableName func respresent return table name of Measurement diff --git a/orm/circuit_diagram_page.go b/orm/circuit_diagram_page.go index 172a693..2e3c50a 100644 --- a/orm/circuit_diagram_page.go +++ b/orm/circuit_diagram_page.go @@ -12,7 +12,7 @@ type Page struct { Context JSONMap `gorm:"column:context;type:jsonb;default:'{}'"` Description string `gorm:"column:description"` Op int `gorm:"column:op"` - Ts time.Time `gorm:"column:ts"` + TS time.Time `gorm:"column:ts"` } // TableName func respresent return table name of Page diff --git a/orm/circuit_diagram_station.go b/orm/circuit_diagram_station.go index e937257..5059e97 100644 --- a/orm/circuit_diagram_station.go +++ b/orm/circuit_diagram_station.go @@ -14,7 +14,7 @@ type Station struct { Description string `gorm:"column:description"` IsLocal bool `gorm:"column:is_local"` Op int `gorm:"column:op"` - Ts time.Time `gorm:"column:ts"` + TS time.Time `gorm:"column:ts"` } // TableName func respresent return table name of Station diff --git a/orm/circuit_diagram_topologic.go b/orm/circuit_diagram_topologic.go index d85137b..7f444e6 100644 --- a/orm/circuit_diagram_topologic.go +++ b/orm/circuit_diagram_topologic.go @@ -16,7 +16,7 @@ type Topologic struct { Flag int `gorm:"column:flag"` Description string `gorm:"column:description;size:512;not null;default:''"` Op int `gorm:"column:op;not null;default:-1"` - Ts time.Time `gorm:"column:ts;type:timestamptz;not null;default:CURRENT_TIMESTAMP"` + TS time.Time `gorm:"column:ts;type:timestamptz;not null;default:CURRENT_TIMESTAMP"` } // TableName func respresent return table name of Page diff --git a/orm/circuit_diagram_zone.go b/orm/circuit_diagram_zone.go index f50aaee..85caf39 100644 --- a/orm/circuit_diagram_zone.go +++ b/orm/circuit_diagram_zone.go @@ -13,7 +13,7 @@ type Zone struct { Name string `gorm:"column:name"` Description string `gorm:"column:description"` Op int `gorm:"column:op"` - Ts time.Time `gorm:"column:ts"` + TS time.Time `gorm:"column:ts"` } // TableName func respresent return table name of Zone diff --git a/orm/demo.go b/orm/demo.go index 8b0991b..2483066 100644 --- a/orm/demo.go +++ b/orm/demo.go @@ -14,7 +14,7 @@ type Demo struct { UIAlarm float32 `gorm:"column:ui_alarm" json:"ui_alarm"` // 低电流告警值 OIAlarm float32 `gorm:"column:oi_alarm" json:"oi_alarm"` // 高电流告警值 Op int `gorm:"column:op" json:"op"` // 操作人 ID - Ts time.Time `gorm:"column:ts" json:"ts"` // 操作时间 + TS time.Time `gorm:"column:ts" json:"ts"` // 操作时间 } // TableName func respresent return table name of busbar section @@ -25,7 +25,6 @@ func (d *Demo) TableName() string { // SetComponentID func implement BasicModelInterface interface func (d *Demo) SetComponentID(componentID int64) { d.ComponentID = componentID - return } // ReturnTableName func implement BasicModelInterface interface diff --git a/task/handler_factory.go b/task/handler_factory.go index 1efdf65..ae1d951 100644 --- a/task/handler_factory.go +++ b/task/handler_factory.go @@ -5,8 +5,11 @@ import ( "context" "fmt" "sync" + "time" + "modelRT/database" "modelRT/logger" + "modelRT/orm" "github.com/gofrs/uuid" "gorm.io/gorm" @@ -14,8 +17,8 @@ import ( // TaskHandler defines the interface for task processors type TaskHandler interface { - // Execute processes a task with the given ID and type - Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error + // Execute processes a task with the given ID, type, and params from the MQ message + Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, params map[string]any, db *gorm.DB) error // CanHandle returns true if this handler can process the given task type CanHandle(taskType TaskType) bool // Name returns the name of the handler for logging and metrics @@ -95,26 +98,205 @@ func NewTopologyAnalysisHandler() *TopologyAnalysisHandler { } } -// Execute processes a topology analysis task -func (h *TopologyAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { - logger.Info(ctx, "Starting topology analysis", +// Execute processes a topology analysis task. +// Params (all sourced from the MQ message, no DB lookup needed): +// - start_component_uuid (string, required): BFS origin +// - end_component_uuid (string, required): reachability target +// - check_in_service (bool, optional, default true): skip out-of-service components +func (h *TopologyAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, params map[string]any, db *gorm.DB) error { + logger.Info(ctx, "topology analysis started", "task_id", taskID) + + // Phase 1: parse params from MQ message + startComponentUUID, endComponentUUID, checkInService, err := parseTopologyAnalysisParams(params) + if err != nil { + return fmt.Errorf("invalid topology analysis params: %w", err) + } + + logger.Info(ctx, "topology params parsed", "task_id", taskID, - "task_type", taskType, + "start", startComponentUUID, + "end", endComponentUUID, + "check_in_service", checkInService, ) - // TODO: Implement actual topology analysis logic - // This would typically involve: - // 1. Fetching task parameters from database - // 2. Performing topology analysis (checking for islands, shorts, etc.) - // 3. Storing results in database - // 4. Updating task status + if err := database.UpdateAsyncTaskProgress(ctx, db, taskID, 20); err != nil { + logger.Warn(ctx, "update progress failed", "task_id", taskID, "progress", 20, "error", err) + } - // Simulate work - logger.Info(ctx, "Topology analysis completed", + // Phase 2: query topology edges from startComponentUUID, build adjacency list + topoEdges, err := database.QueryTopologicByStartUUID(ctx, db, startComponentUUID) + if err != nil { + return fmt.Errorf("query topology from start node: %w", err) + } + + // adjacency list: uuid_from → []uuid_to + adjMap := make(map[uuid.UUID][]uuid.UUID, len(topoEdges)) + // collect all UUIDs for batch InService query + allUUIDs := make(map[uuid.UUID]struct{}, len(topoEdges)*2) + allUUIDs[startComponentUUID] = struct{}{} + for _, edge := range topoEdges { + adjMap[edge.UUIDFrom] = append(adjMap[edge.UUIDFrom], edge.UUIDTo) + allUUIDs[edge.UUIDFrom] = struct{}{} + allUUIDs[edge.UUIDTo] = struct{}{} + } + + if err := database.UpdateAsyncTaskProgress(ctx, db, taskID, 40); err != nil { + logger.Warn(ctx, "update progress failed", "task_id", taskID, "progress", 40, "error", err) + } + + // Phase 3: batch-load InService status (only when checkInService is true) + inServiceMap := make(map[uuid.UUID]bool) + if checkInService { + uuidSlice := make([]uuid.UUID, 0, len(allUUIDs)) + for id := range allUUIDs { + uuidSlice = append(uuidSlice, id) + } + inServiceMap, err = database.QueryComponentsInServiceByUUIDs(ctx, db, uuidSlice) + if err != nil { + return fmt.Errorf("query component in_service status: %w", err) + } + + // check the start node itself before BFS + if !inServiceMap[startComponentUUID] { + return persistTopologyResult(ctx, db, taskID, startComponentUUID, endComponentUUID, + checkInService, false, nil, &startComponentUUID) + } + } + + if err := database.UpdateAsyncTaskProgress(ctx, db, taskID, 60); err != nil { + logger.Warn(ctx, "update progress failed", "task_id", taskID, "progress", 60, "error", err) + } + + // Phase 4: BFS reachability check + visited := make(map[uuid.UUID]struct{}) + parent := make(map[uuid.UUID]uuid.UUID) // for path reconstruction + queue := []uuid.UUID{startComponentUUID} + visited[startComponentUUID] = struct{}{} + isReachable := false + var blockedBy *uuid.UUID + + for len(queue) > 0 { + cur := queue[0] + queue = queue[1:] + + if cur == endComponentUUID { + isReachable = true + break + } + + for _, next := range adjMap[cur] { + if _, seen := visited[next]; seen { + continue + } + if checkInService && !inServiceMap[next] { + // record first out-of-service blocker but keep searching other branches + if blockedBy == nil { + id := next + blockedBy = &id + } + continue + } + visited[next] = struct{}{} + parent[next] = cur + queue = append(queue, next) + } + } + + if err := database.UpdateAsyncTaskProgress(ctx, db, taskID, 80); err != nil { + logger.Warn(ctx, "update progress failed", "task_id", taskID, "progress", 80, "error", err) + } + + // Phase 5: reconstruct path (if reachable) and persist result + var path []uuid.UUID + if isReachable { + blockedBy = nil // reachable path found — clear any partial blocker + path = reconstructPath(parent, startComponentUUID, endComponentUUID) + } + + return persistTopologyResult(ctx, db, taskID, startComponentUUID, endComponentUUID, + checkInService, isReachable, path, blockedBy) +} + +// parseTopologyAnalysisParams extracts and validates the three required fields. +// check_in_service defaults to true when absent. +func parseTopologyAnalysisParams(params map[string]any) (startID, endID uuid.UUID, checkInService bool, err error) { + startStr, ok := params["start_component_uuid"].(string) + if !ok || startStr == "" { + err = fmt.Errorf("missing or invalid start_component_uuid") + return + } + endStr, ok := params["end_component_uuid"].(string) + if !ok || endStr == "" { + err = fmt.Errorf("missing or invalid end_component_uuid") + return + } + startID, err = uuid.FromString(startStr) + if err != nil { + err = fmt.Errorf("parse start_component_uuid %q: %w", startStr, err) + return + } + endID, err = uuid.FromString(endStr) + if err != nil { + err = fmt.Errorf("parse end_component_uuid %q: %w", endStr, err) + return + } + + // check_in_service defaults to true + checkInService = true + if v, exists := params["check_in_service"]; exists { + if b, isBool := v.(bool); isBool { + checkInService = b + } + } + return +} + +// reconstructPath walks the parent map backwards from end to start. +func reconstructPath(parent map[uuid.UUID]uuid.UUID, start, end uuid.UUID) []uuid.UUID { + var path []uuid.UUID + for cur := end; cur != start; cur = parent[cur] { + path = append(path, cur) + } + path = append(path, start) + // reverse: path was built end→start + for i, j := 0, len(path)-1; i < j; i, j = i+1, j-1 { + path[i], path[j] = path[j], path[i] + } + return path +} + +// persistTopologyResult serialises the analysis outcome and writes it to async_task_result. +func persistTopologyResult( + ctx context.Context, db *gorm.DB, taskID uuid.UUID, + startID, endID uuid.UUID, checkInService, isReachable bool, + path []uuid.UUID, blockedBy *uuid.UUID, +) error { + pathStrs := make([]string, 0, len(path)) + for _, id := range path { + pathStrs = append(pathStrs, id.String()) + } + + result := orm.JSONMap{ + "start_component_uuid": startID.String(), + "end_component_uuid": endID.String(), + "check_in_service": checkInService, + "is_reachable": isReachable, + "path": pathStrs, + "computed_at": time.Now().Unix(), + } + if blockedBy != nil { + result["blocked_by"] = blockedBy.String() + } + + if err := database.CreateAsyncTaskResult(ctx, db, taskID, result); err != nil { + return fmt.Errorf("save task result: %w", err) + } + + logger.Info(ctx, "topology analysis completed", "task_id", taskID, - "task_type", taskType, + "is_reachable", isReachable, + "path_length", len(path), ) - return nil } @@ -136,7 +318,7 @@ func NewEventAnalysisHandler() *EventAnalysisHandler { } // Execute processes an event analysis task -func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { +func (h *EventAnalysisHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, params map[string]any, db *gorm.DB) error { logger.Info(ctx, "Starting event analysis", "task_id", taskID, "task_type", taskType, @@ -176,7 +358,7 @@ func NewBatchImportHandler() *BatchImportHandler { } // Execute processes a batch import task -func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { +func (h *BatchImportHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, params map[string]any, db *gorm.DB) error { logger.Info(ctx, "Starting batch import", "task_id", taskID, "task_type", taskType, @@ -214,13 +396,13 @@ func NewCompositeHandler(factory *HandlerFactory) *CompositeHandler { } // Execute delegates task execution to the appropriate handler -func (h *CompositeHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { +func (h *CompositeHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, params map[string]any, db *gorm.DB) error { handler, err := h.factory.GetHandler(taskType) if err != nil { return fmt.Errorf("failed to get handler for task type %s: %w", taskType, err) } - return handler.Execute(ctx, taskID, taskType, db) + return handler.Execute(ctx, taskID, taskType, params, db) } // CanHandle returns true if any registered handler can handle the task type diff --git a/task/queue_message.go b/task/queue_message.go index 95b717a..9cc5ebf 100644 --- a/task/queue_message.go +++ b/task/queue_message.go @@ -11,11 +11,11 @@ import ( // TaskQueueMessage defines minimal message structure for RabbitMQ/Redis queue dispatch // This struct is designed to be lightweight for efficient message transport type TaskQueueMessage struct { - TaskID uuid.UUID `json:"task_id"` - TaskType TaskType `json:"task_type"` - Priority int `json:"priority,omitempty"` // Optional, defaults to constants.TaskPriorityDefault - TraceID string `json:"trace_id,omitempty"` // propagated from the originating HTTP request - SpanID string `json:"span_id,omitempty"` // spanID of the step that enqueued this message + TaskID uuid.UUID `json:"task_id"` + TaskType TaskType `json:"task_type"` + Priority int `json:"priority,omitempty"` // Optional, defaults to constants.TaskPriorityDefault + TraceCarrier map[string]string `json:"trace_carrier,omitempty"` // OTel propagation carrier (B3 headers) + Params map[string]any `json:"params,omitempty"` // Task-specific parameters, set by the HTTP handler } // NewTaskQueueMessage creates a new TaskQueueMessage with default priority diff --git a/task/queue_producer.go b/task/queue_producer.go index 97e6da7..bcaaffc 100644 --- a/task/queue_producer.go +++ b/task/queue_producer.go @@ -11,10 +11,13 @@ import ( "modelRT/constants" "modelRT/logger" "modelRT/mq" - "modelRT/util" "github.com/gofrs/uuid" amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + oteltrace "go.opentelemetry.io/otel/trace" ) // TaskMsgChan buffers task messages to be published to RabbitMQ asynchronously @@ -115,6 +118,11 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT return fmt.Errorf("invalid task message: taskID=%s, taskType=%s", taskID, taskType) } + // Inject OTel trace context so the consumer (worker) can restore the span chain + carrier := make(map[string]string) + otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier)) + message.TraceCarrier = carrier + // Convert message to JSON body, err := json.Marshal(message) if err != nil { @@ -242,17 +250,17 @@ func PushTaskToRabbitMQ(ctx context.Context, cfg config.RabbitMQConfig, taskChan logger.Info(ctx, "task channel closed, exiting push loop") return } - traceID := msg.TraceID - if traceID == "" { - traceID = msg.TaskID.String() // fallback when no HTTP trace was propagated - } - taskCtx := context.WithValue(ctx, constants.CtxKeyTraceID, traceID) - taskCtx = context.WithValue(taskCtx, constants.CtxKeySpanID, util.GenerateSpanID("task-publish")) - taskCtx = context.WithValue(taskCtx, constants.CtxKeyParentSpanID, msg.SpanID) + // Restore trace context from the handler that enqueued this message + taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier)) + taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish", + oteltrace.WithAttributes(attribute.String("task_id", msg.TaskID.String())), + ) if err := producer.PublishTaskWithRetry(taskCtx, msg.TaskID, msg.TaskType, msg.Priority, 3); err != nil { + pubSpan.RecordError(err) logger.Error(taskCtx, "publish task to RabbitMQ failed", "task_id", msg.TaskID, "error", err) } + pubSpan.End() } } } \ No newline at end of file diff --git a/task/test_task.go b/task/test_task.go index 4b38ac9..d85b682 100644 --- a/task/test_task.go +++ b/task/test_task.go @@ -141,26 +141,20 @@ func NewTestTaskHandler() *TestTaskHandler { } // Execute processes a test task using the unified task interface -func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, db *gorm.DB) error { +func (h *TestTaskHandler) Execute(ctx context.Context, taskID uuid.UUID, taskType TaskType, params map[string]any, db *gorm.DB) error { logger.Info(ctx, "Executing test task", "task_id", taskID, "task_type", taskType, ) - // Fetch task parameters from database - asyncTask, err := database.GetAsyncTaskByID(ctx, db, taskID) - if err != nil { - return fmt.Errorf("failed toser fetch task: %w", err) - } - - // Convert params map to TestTaskParams - params := &TestTaskParams{} - if err := params.FromMap(map[string]interface{}(asyncTask.Params)); err != nil { + // Convert params from MQ message to TestTaskParams + taskParams := &TestTaskParams{} + if err := taskParams.FromMap(params); err != nil { return fmt.Errorf("failed to parse task params: %w", err) } // Create and execute test task - testTask := NewTestTask(*params) + testTask := NewTestTask(*taskParams) return testTask.Execute(ctx, taskID, db) } diff --git a/task/worker.go b/task/worker.go index 6a47b56..280b217 100644 --- a/task/worker.go +++ b/task/worker.go @@ -14,11 +14,14 @@ import ( "modelRT/logger" "modelRT/mq" "modelRT/orm" - "modelRT/util" "github.com/gofrs/uuid" "github.com/panjf2000/ants/v2" amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + oteltrace "go.opentelemetry.io/otel/trace" "gorm.io/gorm" ) @@ -283,14 +286,15 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { return } - // derive a per-task context carrying the trace propagated from the originating HTTP request - traceID := taskMsg.TraceID - if traceID == "" { - traceID = taskMsg.TaskID.String() // fallback when message carries no trace - } - taskCtx := context.WithValue(ctx, constants.CtxKeyTraceID, traceID) - taskCtx = context.WithValue(taskCtx, constants.CtxKeySpanID, util.GenerateSpanID("task-worker")) - taskCtx = context.WithValue(taskCtx, constants.CtxKeyParentSpanID, taskMsg.SpanID) + // Restore trace context from the publish span, then create an execute child span + taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(taskMsg.TraceCarrier)) + taskCtx, span := otel.Tracer("modelRT/task").Start(taskCtx, "task.execute", + oteltrace.WithAttributes( + attribute.String("task_id", taskMsg.TaskID.String()), + attribute.String("task_type", string(taskMsg.TaskType)), + ), + ) + defer span.End() ctx = taskCtx logger.Info(ctx, "Processing task", @@ -312,7 +316,7 @@ func (w *TaskWorker) handleMessage(msg amqp.Delivery) { // Execute task using handler startTime := time.Now() - err := w.handler.Execute(ctx, taskMsg.TaskID, taskMsg.TaskType, w.db) + err := w.handler.Execute(ctx, taskMsg.TaskID, taskMsg.TaskType, taskMsg.Params, w.db) processingTime := time.Since(startTime) if err != nil {