optimize demo code

This commit is contained in:
douxu 2024-12-25 16:34:57 +08:00
parent c3f7ddf210
commit 39e380ee1e
11 changed files with 116 additions and 96 deletions

View File

@ -0,0 +1,13 @@
{
"params_list": [{
"anchor_name": "voltage",
"func_type": "1",
"upper_limit": 23,
"lower_limit": 0.5
}, {
"anchor_name": "current",
"func_type": "2",
"upper_limit": 23,
"lower_limit": 0.5
}]
}

View File

@ -10,10 +10,11 @@ import (
"modelRT/constant"
"modelRT/database"
"modelRT/diagram"
"modelRT/log"
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/orm"
realtimedata "modelRT/real-time-data"
"github.com/bitly/go-simplejson"
@ -24,7 +25,7 @@ import (
// ComponentAnchorReplaceHandler define component anchor point replace process API
func ComponentAnchorReplaceHandler(c *gin.Context) {
var uuid, anchorName string
logger := log.GetLoggerInstance()
logger := logger.GetLoggerInstance()
pgClient := database.GetPostgresDBClient()
cancelCtx, cancel := context.WithTimeout(c, 5*time.Second)

View File

@ -6,7 +6,7 @@ import (
"modelRT/database"
"modelRT/diagram"
"modelRT/log"
"modelRT/logger"
"modelRT/network"
"github.com/bitly/go-simplejson"
@ -17,7 +17,7 @@ import (
// CircuitDiagramCreateHandler define circuit diagram create process API
func CircuitDiagramCreateHandler(c *gin.Context) {
logger := log.GetLoggerInstance()
logger := logger.GetLoggerInstance()
pgClient := database.GetPostgresDBClient()
var request network.CircuitDiagramCreateRequest

View File

@ -9,7 +9,7 @@ import (
"modelRT/constant"
"modelRT/database"
"modelRT/diagram"
"modelRT/log"
"modelRT/logger"
"modelRT/model"
"modelRT/network"
"modelRT/orm"
@ -21,7 +21,7 @@ import (
// CircuitDiagramDeleteHandler define circuit diagram delete process API
func CircuitDiagramDeleteHandler(c *gin.Context) {
logger := log.GetLoggerInstance()
logger := logger.GetLoggerInstance()
pgClient := database.GetPostgresDBClient()
var request network.CircuitDiagramDeleteRequest

View File

@ -5,7 +5,7 @@ import (
"strconv"
"modelRT/diagram"
"modelRT/log"
"modelRT/logger"
"modelRT/network"
"github.com/gin-gonic/gin"
@ -23,7 +23,7 @@ import (
// @Failure 400 {object} network.FailureResponse "request process failed"
// @Router /model/diagram_load/{page_id} [get]
func CircuitDiagramLoadHandler(c *gin.Context) {
logger := log.GetLoggerInstance()
logger := logger.GetLoggerInstance()
pageID, err := strconv.ParseInt(c.Query("page_id"), 10, 64)
if err != nil {
logger.Error("get pageID from url param failed", zap.Error(err))

View File

@ -5,7 +5,7 @@ import (
"modelRT/database"
"modelRT/diagram"
"modelRT/log"
"modelRT/logger"
"modelRT/network"
"github.com/bitly/go-simplejson"
@ -15,7 +15,7 @@ import (
// CircuitDiagramUpdateHandler define circuit diagram update process API
func CircuitDiagramUpdateHandler(c *gin.Context) {
logger := log.GetLoggerInstance()
logger := logger.GetLoggerInstance()
pgClient := database.GetPostgresDBClient()
var request network.CircuitDiagramUpdateRequest

View File

@ -1,5 +1,5 @@
// Package log define log struct of wave record project
package log
// Package logger define log struct of wave record project
package logger
import (
"os"

20
main.go
View File

@ -10,7 +10,7 @@ import (
"modelRT/database"
_ "modelRT/docs"
"modelRT/handler"
"modelRT/log"
"modelRT/logger"
"modelRT/middleware"
"modelRT/pool"
realtimedata "modelRT/real-time-data"
@ -39,7 +39,7 @@ var (
var (
modelRTConfig config.ModelRTConfig
postgresDBClient *gorm.DB
logger *zap.Logger
zapLogger *zap.Logger
)
// TODO 使用 wire 依赖注入管理 DVIE 面板注册的 panel
@ -60,13 +60,13 @@ func main() {
}()
// init logger
logger = log.InitLoggerInstance(modelRTConfig.LoggerConfig)
defer logger.Sync()
zapLogger = logger.InitLoggerInstance(modelRTConfig.LoggerConfig)
defer zapLogger.Sync()
// init model parse ants pool
parsePool, err := ants.NewPoolWithFunc(modelRTConfig.ParseConcurrentQuantity, pool.ParseFunc)
if err != nil {
logger.Error("init concurrent parse task pool failed", zap.Error(err))
zapLogger.Error("init concurrent parse task pool failed", zap.Error(err))
panic(err)
}
defer parsePool.Release()
@ -75,7 +75,7 @@ func main() {
// TODO 优化轮询池初始化参数定义方式,改为从 config 中获取
pollingPool, err := ants.NewPoolWithFunc(1000, pool.AnchorFunc)
if err != nil {
logger.Error("init concurrent data polling task pool failed", zap.Error(err))
zapLogger.Error("init concurrent data polling task pool failed", zap.Error(err))
panic(err)
}
defer pollingPool.Release()
@ -86,16 +86,16 @@ func main() {
go realtimedata.DataPolling(cancelCtx, pollingPool)
// load circuit diagram from postgres
err = database.QueryCircuitDiagramComponentFromDB(ctx, parsePool, logger)
err = database.QueryCircuitDiagramComponentFromDB(ctx, parsePool, zapLogger)
if err != nil {
logger.Error("load circuit diagrams from postgres failed", zap.Error(err))
zapLogger.Error("load circuit diagrams from postgres failed", zap.Error(err))
panic(err)
}
// TODO 暂时屏蔽完成 swagger 启动测试
err = database.QueryTopologicFromDB(ctx, logger, modelRTConfig.GridID, modelRTConfig.ZoneID, modelRTConfig.StationID)
err = database.QueryTopologicFromDB(ctx, zapLogger, modelRTConfig.GridID, modelRTConfig.ZoneID, modelRTConfig.StationID)
if err != nil {
logger.Error("load topologic info from postgres failed", zap.Error(err))
zapLogger.Error("load topologic info from postgres failed", zap.Error(err))
panic(err)
}

78
network/api_endpoint.go Normal file
View File

@ -0,0 +1,78 @@
// Package network define struct of network operation
package network
import (
"fmt"
"io"
"net/http"
"strings"
"time"
"modelRT/logger"
"go.uber.org/zap"
)
// APIEndpoint defines an api endpoint struct to poll data from dataRT service
type APIEndpoint struct {
URL string `json:"url"`
Method string `json:"method"` // HTTP 方法,如 "GET", "POST"
Headers map[string]string `json:"headers"`
QueryParams map[string]string `json:"query_params"`
Body string `json:"body"` // 对于 POST 请求需要一个请求体
Interval int `json:"interval"` // 轮询间隔时间(秒)
}
// fetchAPI defines execute http request and return response or error
func fetchAPI(endpoint APIEndpoint) (string, error) {
client := &http.Client{}
req, err := http.NewRequest(endpoint.Method, endpoint.URL, nil)
if err != nil {
return "", err
}
for key, value := range endpoint.Headers {
req.Header.Set(key, value)
}
query := req.URL.Query()
for key, value := range endpoint.QueryParams {
query.Set(key, value)
}
req.URL.RawQuery = query.Encode()
if endpoint.Method == "POST" || endpoint.Method == "PUT" {
req.Body = io.NopCloser(strings.NewReader(endpoint.Body))
req.Header.Set("Content-Type", "application/json")
}
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
// pollAPIEndpoints defines unmarshal polling data from http request
func pollAPIEndpoints(endpoint APIEndpoint) {
logger := logger.GetLoggerInstance()
respStr, err := fetchAPI(endpoint)
if err != nil {
logger.Error("unmarshal component anchor point replace info failed", zap.Error(err))
return
}
fmt.Println(respStr)
time.Sleep(time.Duration(endpoint.Interval) * time.Second)
// 注意:这里使用了 endpoint.Interval 而不是传入的 interval
// 但为了示例简单,我们统一使用传入的 interval。
// 如果要根据每个端点的不同间隔来轮询,应该使用 endpoint.Interval。
}

View File

@ -41,75 +41,3 @@ var AnchorFunc = func(anchorConfig interface{}) {
}
}
}
// type APIEndpoint struct {
// URL string `json:"url"`
// Method string `json:"method"` // HTTP 方法,如 "GET", "POST"
// Headers map[string]string `json:"headers"`
// QueryParams map[string]string `json:"query_params"`
// Body string `json:"body"` // 对于 POST 请求等,可能需要一个请求体
// Interval int `json:"interval"` // 轮询间隔时间(秒)
// }
// // fetchAPI 执行 HTTP 请求并返回响应体(作为字符串)或错误
// func fetchAPI(endpoint APIEndpoint) (string, error) {
// client := &http.Client{}
// // 构建请求
// req, err := http.NewRequest(endpoint.Method, endpoint.URL, nil)
// if err != nil {
// return "", err
// }
// // 设置请求头
// for key, value := range endpoint.Headers {
// req.Header.Set(key, value)
// }
// // 设置查询参数(如果需要)
// q := req.URL.Query()
// for key, value := range endpoint.QueryParams {
// q.Set(key, value)
// }
// req.URL.RawQuery = q.Encode()
// // 设置请求体(如果需要,例如 POST 请求)
// if endpoint.Method == "POST" || endpoint.Method == "PUT" {
// req.Body = ioutil.NopCloser(strings.NewReader(endpoint.Body))
// req.Header.Set("Content-Type", "application/json") // 假设是 JSON 请求体
// }
// // 执行请求
// resp, err := client.Do(req)
// if err != nil {
// return "", err
// }
// defer resp.Body.Close()
// // 读取响应体
// body, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// return "", err
// }
// return string(body), nil
// }
// // pollAPIEndpoints 轮询 API 端点列表,并根据指定的间隔时间执行请求
// func pollAPIEndpoints(endpoints []APIEndpoint, interval int, wg *sync.WaitGroup, results chan<- string) {
// defer wg.Done()
// for _, endpoint := range endpoints {
// for {
// body, err := fetchAPI(endpoint)
// if err != nil {
// log.Printf("Error fetching from %s: %v", endpoint.URL, err)
// } else {
// results <- fmt.Sprintf("Response from %s: %s", endpoint.URL, body)
// }
// time.Sleep(time.Duration(interval) * time.Second)
// // 注意:这里使用了 endpoint.Interval 而不是传入的 interval
// // 但为了示例简单,我们统一使用传入的 interval。
// // 如果要根据每个端点的不同间隔来轮询,应该使用 endpoint.Interval。
// }
// }
// }

View File

@ -9,7 +9,7 @@ import (
"syscall"
"time"
"modelRT/log"
"modelRT/logger"
"github.com/confluentinc/confluent-kafka-go/kafka"
"go.uber.org/zap"
@ -22,7 +22,7 @@ func RealTimeDataComputer(ctx context.Context, consumerConfig kafka.ConfigMap, t
defer cancel()
// get a logger
logger := log.GetLoggerInstance()
logger := logger.GetLoggerInstance()
// setup a channel to listen for interrupt signals
// TODO 将中断信号放到入参中