refactor: modernize Go idioms and add MongoDB K8s manifests

- replace interface{} with any across ~30 files for Go 1.18+ style
  - adopt for-range-over-int loops in place of explicit index loops
  - use maps.Copy from stdlib to replace manual map copy loops
  - use min() builtin for exponential backoff delay cap in retry_manager
  - add MongoDB 7.0 K8s manifests (StatefulSet, Service, PVC, Secret)
  - document PostgreSQL and MongoDB deploy steps in deploy.md with SSH tunnel port mappings
This commit is contained in:
douxu 2026-05-29 14:28:58 +08:00
parent bacd43617e
commit 57d1111a83
39 changed files with 307 additions and 116 deletions

View File

@ -139,10 +139,10 @@ func (e *AppError) SetMsg(msg string) *AppError {
}
type formattedErr struct {
Code int `json:"code"`
Msg string `json:"msg"`
Cause interface{} `json:"cause"`
Occurred string `json:"occurred"`
Code int `json:"code"`
Msg string `json:"msg"`
Cause any `json:"cause"`
Occurred string `json:"occurred"`
}
// toStructuredError define func convert AppError to structured error for better readability

View File

@ -42,7 +42,7 @@ var baseCurrentFunc = func(archorValue float64, args ...float64) float64 {
}
// SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float64, args ...float64) float64, []float64) {
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]any) (func(archorValue float64, args ...float64) float64, []float64) {
if componentType == constants.DemoType {
switch anchorName {
case "voltage":

View File

@ -1,4 +1,4 @@
# 项目依赖服务部署指南
# 项目服务部署指南
本项目依赖于 `PostgreSQL` 数据库和 `Redis Stack Server`(包含 `Redisearch` 等模块)部署文档将使用 `Docker` 容器化技术部署这两个依赖服务
@ -679,6 +679,108 @@ kubectl apply -f deploy/k8s/rabbitmq-service.yaml
> **注意:** 证书认证用户的 `password_hash` 留空RabbitMQ 通过 `ssl_cert_login_from = common_name` 将证书 CN 映射为用户名。
#### 4.4 部署 PostgreSQL
```bash
kubectl apply -f deploy/k8s/pg-configmap.yaml
kubectl apply -f deploy/k8s/pg-pvc.yaml
kubectl apply -f deploy/k8s/pg-statefulset.yaml
kubectl apply -f deploy/k8s/pg-service.yaml
```
| 参数 | 值 | 说明 |
| :--- | :--- | :--- |
| **镜像** | `postgres:13.16` | PostgreSQL 13.16 |
| **NodePort** | `30432` | 集群外访问端口 |
| **数据库** | `demo` | ConfigMap 中 `POSTGRES_DB` |
| **用户名** | `postgres` | ConfigMap 中 `POSTGRES_USER` |
| **密码** | `coslight` | ConfigMap `postgres-config` 中配置,生产环境迁移至 Secret |
| **存储** | `2Gi` | PVC `postgres-data` |
##### 4.4.1 等待 Pod 就绪
```bash
kubectl wait --for=condition=ready pod -l app=postgres --timeout=120s
```
##### 4.4.2 初始化异步任务表
PostgreSQL 就绪后执行 1.4 节的建表 SQL可通过以下方式进入容器执行
```bash
# 交互式 psql
kubectl exec -it $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
-- psql -U postgres -d demo
# 或将 SQL 文件通过管道一次性执行
kubectl exec -i $(kubectl get pod -l app=postgres -o jsonpath='{.items[0].metadata.name}') \
-- psql -U postgres -d demo < /path/to/init.sql
```
##### 4.4.3 状态检查
```bash
kubectl get pods -l app=postgres
kubectl logs -l app=postgres --tail=30
```
##### 4.4.4 清理
```bash
kubectl delete -f deploy/k8s/pg-service.yaml \
-f deploy/k8s/pg-statefulset.yaml \
-f deploy/k8s/pg-pvc.yaml \
-f deploy/k8s/pg-configmap.yaml
```
#### 4.5 部署 MongoDB
```bash
kubectl apply -f deploy/k8s/mongodb-secret.yaml
kubectl apply -f deploy/k8s/mongodb-pvc.yaml
kubectl apply -f deploy/k8s/mongodb-statefulset.yaml
kubectl apply -f deploy/k8s/mongodb-service.yaml
```
| 参数 | 值 | 说明 |
| :--- | :--- | :--- |
| **镜像** | `mongo:7.0` | MongoDB 7.0 |
| **NodePort** | `30017` | 集群外访问端口 |
| **用户名** | `admin` | Root 管理员 |
| **密码** | `coslight` | Secret `mongodb-secret` 中配置,生产环境请替换强密码 |
| **存储** | `2Gi` | PVC `mongodb-data` |
> **注意:** 密码存储在 `mongodb-secret.yaml``stringData` 中,生产环境应替换为强密码,并避免将明文密码提交至版本库。
##### 4.5.1 等待 Pod 就绪
```bash
kubectl wait --for=condition=ready pod -l app=mongodb --timeout=120s
```
##### 4.5.2 连接验证
```bash
kubectl exec -it $(kubectl get pod -l app=mongodb -o jsonpath='{.items[0].metadata.name}') \
-- mongosh -u admin -p coslight --authenticationDatabase admin
```
##### 4.5.3 状态检查
```bash
kubectl get pods -l app=mongodb
kubectl logs -l app=mongodb --tail=30
```
##### 4.5.4 清理
```bash
kubectl delete -f deploy/k8s/mongodb-service.yaml \
-f deploy/k8s/mongodb-statefulset.yaml \
-f deploy/k8s/mongodb-pvc.yaml \
-f deploy/k8s/mongodb-secret.yaml
```
### 5\. 部署 ModelRTKubernetes
所有资源部署在 `default` 命名空间YAML 文件位于 `deploy/k8s/`
@ -852,7 +954,9 @@ Mac 本地端口 ──SSH隧道──▶ Ubuntu 宿主机 (192.168.1.101)
#### 7.2 建立隧道
```bash
ssh -L 5671:192.168.49.2:30671 \
ssh -L 5432:192.168.49.2:30432 \
-L 27017:192.168.49.2:30017 \
-L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \
-L 6379:192.168.49.2:30001 \
-L 4318:192.168.49.2:31318 \
@ -866,6 +970,8 @@ ssh -L 5671:192.168.49.2:30671 \
```bash
ssh -fN \
-L 5432:192.168.49.2:30432 \
-L 27017:192.168.49.2:30017 \
-L 5671:192.168.49.2:30671 \
-L 15671:192.168.49.2:31671 \
-L 6379:192.168.49.2:30001 \
@ -880,6 +986,8 @@ ssh -fN \
| Mac 本地端口 | Minikube NodePort | 服务 | 说明 |
| :--- | :--- | :--- | :--- |
| `5432` | `30432` | PostgreSQL | 数据库连接 `localhost:5432` |
| `27017` | `30017` | MongoDB | 数据库连接 `localhost:27017` |
| `5671` | `30671` | RabbitMQ AMQP | ModelRT / EventRT 消息队列连接 |
| `15671` | `31671` | RabbitMQ Management | RabbitMQ 管理界面 `http://localhost:15671` |
| `6379` | `30001` | Redis | 分布式锁 / 数据存储 |

View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mongodb-data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
name: mongodb-secret
type: Opaque
stringData:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: coslight

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: mongodb
labels:
app: mongodb
spec:
type: NodePort
selector:
app: mongodb
ports:
- name: mongodb
port: 27017
targetPort: 27017
nodePort: 30017

View File

@ -0,0 +1,61 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: mongodb
labels:
app: mongodb
spec:
serviceName: mongodb
replicas: 1
selector:
matchLabels:
app: mongodb
template:
metadata:
labels:
app: mongodb
spec:
containers:
- name: mongodb
image: mongo:7.0
imagePullPolicy: IfNotPresent
ports:
- name: mongodb
containerPort: 27017
envFrom:
- secretRef:
name: mongodb-secret
volumeMounts:
- name: mongodb-data
mountPath: /data/db
readinessProbe:
exec:
command:
- mongosh
- --eval
- "db.adminCommand('ping')"
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 12
livenessProbe:
exec:
command:
- mongosh
- --eval
- "db.adminCommand('ping')"
initialDelaySeconds: 30
periodSeconds: 20
timeoutSeconds: 3
failureThreshold: 3
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
volumes:
- name: mongodb-data
persistentVolumeClaim:
claimName: mongodb-data

View File

@ -129,9 +129,9 @@ func generateOutlierSegments(totalSize, minLength, maxLength, count int, distrib
segments := make([]OutlierSegment, 0, count)
usedPositions := make(map[int]bool)
for i := 0; i < count; i++ {
for range count {
// 尝试多次寻找合适的位置
for attempt := 0; attempt < 10; attempt++ {
for range 10 {
length := rand.Intn(maxLength-minLength+1) + minLength
start := rand.Intn(totalSize - length)

View File

@ -18,7 +18,7 @@ func TestHMSet(t *testing.T) {
PoolSize: 50,
DialTimeout: 10 * time.Second,
})
params := map[string]interface{}{
params := map[string]any{
"field1": "Hello1",
"field2": "World1",
"field3": 11,

View File

@ -53,7 +53,7 @@ func (n *MultiBranchTreeNode) FindNodeByID(id uuid.UUID) *MultiBranchTreeNode {
}
func (n *MultiBranchTreeNode) PrintTree(level int) {
for i := 0; i < level; i++ {
for range level {
fmt.Print(" ")
}

View File

@ -29,7 +29,7 @@ func NewRedisHash(ctx context.Context, hashKey string, lockLeaseTime uint64, nee
}
// SetRedisHashByMap define func of set redis hash by map struct
func (rh *RedisHash) SetRedisHashByMap(fields map[string]interface{}) error {
func (rh *RedisHash) SetRedisHashByMap(fields map[string]any) error {
err := rh.rwLocker.WLock(rh.ctx)
if err != nil {
logger.Error(rh.ctx, "lock wLock by hash_key failed", "hash_key", rh.hashKey, "error", err)
@ -46,7 +46,7 @@ func (rh *RedisHash) SetRedisHashByMap(fields map[string]interface{}) error {
}
// SetRedisHashByKV define func of set redis hash by kv struct
func (rh *RedisHash) SetRedisHashByKV(field string, value interface{}) error {
func (rh *RedisHash) SetRedisHashByKV(field string, value any) error {
err := rh.rwLocker.WLock(rh.ctx)
if err != nil {
logger.Error(rh.ctx, "lock wLock by hash_key failed", "hash_key", rh.hashKey, "error", err)

View File

@ -46,7 +46,7 @@ func (rs *RedisString) Get(stringKey string) (string, error) {
}
// Set define func of set the value of key
func (rs *RedisString) Set(stringKey string, value interface{}) error {
func (rs *RedisString) Set(stringKey string, value any) error {
err := rs.rwLocker.WLock(rs.ctx)
if err != nil {
logger.Error(rs.ctx, "lock wLock by stringKey failed", "string_key", stringKey, "error", err)

View File

@ -30,7 +30,7 @@ func NewRedisZSet(ctx context.Context, key string, lockLeaseTime uint64, needRef
}
// ZADD define func of add redis zset by members
func (rs *RedisZSet) ZADD(setKey string, score float64, member interface{}) error {
func (rs *RedisZSet) ZADD(setKey string, score float64, member any) error {
err := rs.rwLocker.WLock(rs.ctx)
if err != nil {
logger.Error(rs.ctx, "lock wLock by setKey failed", "set_key", setKey, "error", err)

View File

@ -12,7 +12,7 @@ var graphOverview sync.Map
// PrintGrapMap define func of print circuit diagram topologic info data
func PrintGrapMap() {
graphOverview.Range(func(key, value interface{}) bool {
graphOverview.Range(func(key, value any) bool {
fmt.Println(key, value)
return true
})

View File

@ -68,7 +68,7 @@ func ComponentAnchorReplaceHandler(c *gin.Context) {
resp := network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": request.UUID,
},
}

View File

@ -127,7 +127,7 @@ func validatePerformanceAnalysisParams(params map[string]any) bool {
// Check required parameters for performance analysis
if componentIDs, ok := params["component_ids"]; !ok {
return false
} else if ids, isSlice := componentIDs.([]interface{}); !isSlice || len(ids) == 0 {
} else if ids, isSlice := componentIDs.([]any); !isSlice || len(ids) == 0 {
return false
}
return true

View File

@ -41,7 +41,7 @@ func AttrDeleteHandler(c *gin.Context) {
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{"attr_token": request.AttrToken},
Payload: map[string]any{"attr_token": request.AttrToken},
})
return
}
@ -49,7 +49,7 @@ func AttrDeleteHandler(c *gin.Context) {
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: map[string]interface{}{
Payload: map[string]any{
"attr_token": request.AttrToken,
},
})

View File

@ -46,7 +46,7 @@ func AttrGetHandler(c *gin.Context) {
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{"attr_token": request.AttrToken},
Payload: map[string]any{"attr_token": request.AttrToken},
})
return
}
@ -59,7 +59,7 @@ func AttrGetHandler(c *gin.Context) {
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: map[string]interface{}{
Payload: map[string]any{
"attr_token": request.AttrToken,
"attr_value": attrValue,
},

View File

@ -43,7 +43,7 @@ func AttrSetHandler(c *gin.Context) {
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{"attr_token": request.AttrToken},
Payload: map[string]any{"attr_token": request.AttrToken},
})
return
}
@ -51,7 +51,7 @@ func AttrSetHandler(c *gin.Context) {
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: map[string]interface{}{
Payload: map[string]any{
"attr_token": request.AttrToken,
},
})

View File

@ -37,7 +37,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": request.PageID,
},
}
@ -65,7 +65,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"topologic_info": topologicLink,
},
}
@ -89,7 +89,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"topologic_infos": topologicCreateInfos,
},
}
@ -111,7 +111,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"component_infos": request.ComponentInfos,
},
}
@ -130,7 +130,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": info.UUID,
"component_params": info.Params,
},
@ -152,7 +152,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
resp := network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": request.PageID,
},
}

View File

@ -42,7 +42,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": request.PageID,
},
}
@ -70,7 +70,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"topologic_info": topologicLink,
},
}
@ -95,7 +95,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"topologic_info": topologicDelInfo,
},
}
@ -112,7 +112,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"topologic_info": topologicDelInfo,
},
}
@ -138,7 +138,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": componentInfo.UUID,
},
}
@ -162,7 +162,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": componentInfo.UUID,
},
}
@ -184,7 +184,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": componentInfo.UUID,
},
}
@ -205,7 +205,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
resp := network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": request.PageID,
},
}

View File

@ -33,7 +33,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": pageID,
},
}
@ -48,14 +48,14 @@ func CircuitDiagramLoadHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": pageID,
},
}
c.JSON(http.StatusOK, resp)
return
}
payload := make(map[string]interface{})
payload := make(map[string]any)
payload["root_vertex"] = topologicInfo.RootVertex
payload["topologic"] = topologicInfo.VerticeLinks
@ -69,7 +69,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": componentUUID,
},
}
@ -84,7 +84,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": componentUUID,
},
}
@ -103,7 +103,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": topologicInfo.RootVertex,
},
}
@ -118,7 +118,7 @@ func CircuitDiagramLoadHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": rootVertexUUID,
},
}

View File

@ -35,7 +35,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": request.PageID,
},
}
@ -52,7 +52,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"topologic_info": topologicLink,
},
}
@ -75,7 +75,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"topologic_info": topologicChangeInfo,
},
}
@ -92,7 +92,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"topologic_info": topologicChangeInfo,
},
}
@ -109,7 +109,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": request.PageID,
"component_info": request.ComponentInfos,
},
@ -129,7 +129,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
resp := network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
Payload: map[string]interface{}{
Payload: map[string]any{
"uuid": info.UUID,
"component_params": info.Params,
},
@ -152,7 +152,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
resp := network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: map[string]interface{}{
Payload: map[string]any{
"page_id": request.PageID,
},
}

View File

@ -50,7 +50,7 @@ func QueryHistoryDataHandler(c *gin.Context) {
resp := network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
Payload: map[string]interface{}{
Payload: map[string]any{
"events": events,
},
}

View File

@ -64,7 +64,7 @@ func RealTimeDataReceivehandler(c *gin.Context) {
realtimedata.RealTimeDataChan <- request
payload := map[string]interface{}{
payload := map[string]any{
"component_uuid": request.PayLoad.ComponentUUID,
"point": request.PayLoad.Point,
}
@ -82,8 +82,8 @@ func RealTimeDataReceivehandler(c *gin.Context) {
}
}
func processResponse(code int64, msg string, payload map[string]interface{}) []byte {
resp := map[string]interface{}{
func processResponse(code int64, msg string, payload map[string]any) []byte {
resp := map[string]any{
"code": code,
"msg": msg,
"payload": payload,

View File

@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"net/http"
"os"
"strconv"
@ -39,9 +40,7 @@ type lokiSyncer struct {
func newLokiSyncer(lCfg config.LokiConfig) *lokiSyncer {
// always tag development logs with env=development; caller-supplied labels override if needed
labels := map[string]string{"env": "development"}
for k, v := range lCfg.Labels {
labels[k] = v
}
maps.Copy(labels, lCfg.Labels)
ls := &lokiSyncer{
endpoint: lCfg.Endpoint + "/loki/api/v1/push",
labels: labels,

View File

@ -139,7 +139,7 @@ func LogAccess() gin.HandlerFunc {
}
}
func accessLog(c *gin.Context, accessType string, dur time.Duration, body []byte, dataOut interface{}) {
func accessLog(c *gin.Context, accessType string, dur time.Duration, body []byte, dataOut any) {
req := c.Request
bodyStr := string(body)
query := req.URL.RawQuery

View File

@ -11,7 +11,7 @@ type AttrModelInterface interface {
GetZoneInfo() *orm.Zone
GetStationInfo() *orm.Station
GetComponentInfo() *orm.Component
GetAttrValue() interface{} // New method to get the attribute value
GetAttrValue() any // New method to get the attribute value
IsLocal() bool
}

View File

@ -77,7 +77,7 @@ func PollAPIEndpoints(endpoint APIEndpoint) ([]float64, error) {
}
dataLen := len(realDataJSON.Get("data").MustArray())
for i := 0; i < dataLen; i++ {
for i := range dataLen {
valueSlice = append(valueSlice, realDataJSON.Get("data").GetIndex(i).Get("value").MustFloat64())
}
return valueSlice, nil

View File

@ -13,7 +13,7 @@ type AsyncTaskCreateRequest struct {
// enum: TOPOLOGY_ANALYSIS, PERFORMANCE_ANALYSIS, EVENT_ANALYSIS, BATCH_IMPORT
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"异步任务类型"`
// required: true
Params map[string]interface{} `json:"params" swaggertype:"object" description:"任务参数,根据任务类型不同而不同"`
Params map[string]any `json:"params" swaggertype:"object" description:"任务参数,根据任务类型不同而不同"`
}
// AsyncTaskCreateResponse defines the response structure for creating an asynchronous task
@ -29,16 +29,16 @@ type AsyncTaskResultQueryRequest struct {
// AsyncTaskResult defines the structure for a single task result
type AsyncTaskResult struct {
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"任务类型"`
Status string `json:"status" example:"COMPLETED" description:"任务状态SUBMITTED, RUNNING, COMPLETED, FAILED"`
Progress *int `json:"progress,omitempty" example:"65" description:"任务进度(0-100)仅当状态为RUNNING时返回"`
CreatedAt int64 `json:"created_at" example:"1741846200" description:"任务创建时间戳"`
FinishedAt *int64 `json:"finished_at,omitempty" example:"1741846205" description:"任务完成时间戳仅当状态为COMPLETED或FAILED时返回"`
Result map[string]interface{} `json:"result,omitempty" swaggertype:"object" description:"任务结果仅当状态为COMPLETED时返回"`
ErrorCode *int `json:"error_code,omitempty" example:"400102" description:"错误码仅当状态为FAILED时返回"`
ErrorMessage *string `json:"error_message,omitempty" example:"Component UUID not found" description:"错误信息仅当状态为FAILED时返回"`
ErrorDetail map[string]interface{} `json:"error_detail,omitempty" swaggertype:"object" description:"错误详情仅当状态为FAILED时返回"`
TaskID uuid.UUID `json:"task_id" example:"123e4567-e89b-12d3-a456-426614174000" description:"任务唯一标识符"`
TaskType string `json:"task_type" example:"TOPOLOGY_ANALYSIS" description:"任务类型"`
Status string `json:"status" example:"COMPLETED" description:"任务状态SUBMITTED, RUNNING, COMPLETED, FAILED"`
Progress *int `json:"progress,omitempty" example:"65" description:"任务进度(0-100)仅当状态为RUNNING时返回"`
CreatedAt int64 `json:"created_at" example:"1741846200" description:"任务创建时间戳"`
FinishedAt *int64 `json:"finished_at,omitempty" example:"1741846205" description:"任务完成时间戳仅当状态为COMPLETED或FAILED时返回"`
Result map[string]any `json:"result,omitempty" swaggertype:"object" description:"任务结果仅当状态为COMPLETED时返回"`
ErrorCode *int `json:"error_code,omitempty" example:"400102" description:"错误码仅当状态为FAILED时返回"`
ErrorMessage *string `json:"error_message,omitempty" example:"Component UUID not found" description:"错误信息仅当状态为FAILED时返回"`
ErrorDetail map[string]any `json:"error_detail,omitempty" swaggertype:"object" description:"错误详情仅当状态为FAILED时返回"`
}
// AsyncTaskResultQueryResponse defines the response structure for querying task results

View File

@ -8,8 +8,8 @@ type AttrGetRequest struct {
// AttrSetRequest defines the request payload for setting an attribute
type AttrSetRequest struct {
AttrToken string `json:"attr_token"`
AttrValue interface{} `json:"attr_value"`
AttrToken string `json:"attr_token"`
AttrValue any `json:"attr_value"`
}
// AttrDeleteRequest defines the request payload for deleting an attribute

View File

@ -8,7 +8,7 @@ import (
)
// ConvertAnyComponentInfosToComponents define convert any component request info to component struct
func ConvertAnyComponentInfosToComponents(anyInfo interface{}) (*orm.Component, error) {
func ConvertAnyComponentInfosToComponents(anyInfo any) (*orm.Component, error) {
switch info := anyInfo.(type) {
case ComponentCreateInfo:
return ConvertComponentCreateInfosToComponents(info)

View File

@ -28,7 +28,7 @@ func AnchorPoolInit(concurrentQuantity int) (pool *ants.PoolWithFunc, err error)
}
// AnchorFunc defines func that process the real time data of component anchor params
var AnchorFunc = func(poolConfig interface{}) {
var AnchorFunc = func(poolConfig any) {
var firstStart bool
alertManager := alert.GetAlertMangerInstance()

View File

@ -12,7 +12,7 @@ import (
)
// ParseFunc defines func that parses the model data from postgres
var ParseFunc = func(parseConfig interface{}) {
var ParseFunc = func(parseConfig any) {
modelParseConfig, ok := parseConfig.(config.ModelParseConfig)
if !ok {
logger.Error(modelParseConfig.Ctx, "conversion model parse config type failed")

View File

@ -10,7 +10,7 @@ import (
// DataItem define structure for storing data, insertion time, and last access time
type DataItem struct {
Data interface{}
Data any
InsertTime time.Time
LastAccess time.Time
Index int
@ -38,14 +38,14 @@ func (pq priorityQueue) Swap(i, j int) {
pq[j].item.Index = j
}
func (pq *priorityQueue) Push(x interface{}) {
func (pq *priorityQueue) Push(x any) {
n := len(*pq)
queueItem := x.(*priorityQueueItem)
queueItem.item.Index = n
*pq = append(*pq, queueItem)
}
func (pq *priorityQueue) Pop() interface{} {
func (pq *priorityQueue) Pop() any {
old := *pq
n := len(old)
queueItem := old[n-1]
@ -65,7 +65,7 @@ func (pq *priorityQueue) update(item *DataItem, newPrio int64) {
type TimeCache struct {
mu sync.Mutex
capacity int
items map[interface{}]*DataItem
items map[any]*DataItem
pq priorityQueue
}
@ -73,13 +73,13 @@ type TimeCache struct {
func NewTimeCache(capacity int) *TimeCache {
return &TimeCache{
capacity: capacity,
items: make(map[interface{}]*DataItem),
items: make(map[any]*DataItem),
pq: make(priorityQueue, 0, capacity),
}
}
// Add 添加一个新项到缓存中
func (tc *TimeCache) Add(data interface{}) {
func (tc *TimeCache) Add(data any) {
tc.mu.Lock()
defer tc.mu.Unlock()

View File

@ -13,8 +13,8 @@ import (
type Params interface {
Validate() error
GetType() UnifiedTaskType
ToMap() map[string]interface{}
FromMap(params map[string]interface{}) error
ToMap() map[string]any
FromMap(params map[string]any) error
}
// BaseTask provides common functionality for all task implementations

View File

@ -22,10 +22,10 @@ type RetryStrategy interface {
// ExponentialBackoffRetry implements exponential backoff with jitter retry strategy
type ExponentialBackoffRetry struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
RandomFactor float64 // Jitter factor to avoid thundering herd problem
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
RandomFactor float64 // Jitter factor to avoid thundering herd problem
}
// NewExponentialBackoffRetry creates a new exponential backoff retry strategy
@ -67,12 +67,9 @@ func (s *ExponentialBackoffRetry) ShouldRetry(ctx context.Context, taskID string
}
// Calculate exponential backoff: initialDelay * 2^retryCount
delay := s.InitialDelay * time.Duration(math.Pow(2, float64(retryCount)))
// Apply maximum delay cap
if delay > s.MaxDelay {
delay = s.MaxDelay
}
delay := min(
// Apply maximum delay cap
s.InitialDelay*time.Duration(math.Pow(2, float64(retryCount))), s.MaxDelay)
// Add jitter to avoid thundering herd
if s.RandomFactor > 0 {
@ -178,10 +175,10 @@ func (s *NoRetryStrategy) GetMaxRetries() int {
// DefaultRetryStrategy returns the default retry strategy (exponential backoff)
func DefaultRetryStrategy() RetryStrategy {
return NewExponentialBackoffRetry(
constants.TaskRetryMaxDefault, // max retries
constants.TaskRetryInitialDelayDefault, // initial delay
constants.TaskRetryMaxDelayDefault, // max delay
constants.TaskRetryRandomFactorDefault, // random factor (10% jitter)
constants.TaskRetryMaxDefault, // max retries
constants.TaskRetryInitialDelayDefault, // initial delay
constants.TaskRetryMaxDelayDefault, // max delay
constants.TaskRetryRandomFactorDefault, // random factor (10% jitter)
)
}

View File

@ -45,15 +45,15 @@ func (p *TestTaskParams) GetType() UnifiedTaskType {
}
// ToMap converts parameters to map for database storage
func (p *TestTaskParams) ToMap() map[string]interface{} {
return map[string]interface{}{
func (p *TestTaskParams) ToMap() map[string]any {
return map[string]any{
"sleep_duration": p.SleepDuration,
"message": p.Message,
}
}
// FromMap populates parameters from map (for database retrieval)
func (p *TestTaskParams) FromMap(params map[string]interface{}) error {
func (p *TestTaskParams) FromMap(params map[string]any) error {
if v, ok := params["sleep_duration"]; ok {
if duration, isFloat := v.(float64); isFloat {
p.SleepDuration = int(duration)
@ -103,7 +103,7 @@ func (t *TestTask) Execute(ctx context.Context, taskID uuid.UUID, db *gorm.DB) e
time.Sleep(sleepDuration)
// Build result
result := map[string]interface{}{
result := map[string]any{
"status": "completed",
"sleep_duration": params.SleepDuration,
"message": params.Message,

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"sync"
"time"
@ -565,24 +566,16 @@ func (w *TaskWorker) GetMetrics() *WorkerMetrics {
// Deep copy maps to avoid data races
tasksProcessedCopy := make(map[TaskType]int64)
for k, v := range w.metrics.TasksProcessed {
tasksProcessedCopy[k] = v
}
maps.Copy(tasksProcessedCopy, w.metrics.TasksProcessed)
tasksFailedCopy := make(map[TaskType]int64)
for k, v := range w.metrics.TasksFailed {
tasksFailedCopy[k] = v
}
maps.Copy(tasksFailedCopy, w.metrics.TasksFailed)
tasksSuccessCopy := make(map[TaskType]int64)
for k, v := range w.metrics.TasksSuccess {
tasksSuccessCopy[k] = v
}
maps.Copy(tasksSuccessCopy, w.metrics.TasksSuccess)
processingTimeCopy := make(map[TaskType]time.Duration)
for k, v := range w.metrics.ProcessingTime {
processingTimeCopy[k] = v
}
maps.Copy(processingTimeCopy, w.metrics.ProcessingTime)
// Create a copy without the mutex to avoid copylocks warning
return &WorkerMetrics{