modelRT/main.go

244 lines
6.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// entry function
package main
import (
"context"
"flag"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"modelRT/alert"
"modelRT/config"
"modelRT/database"
"modelRT/diagram"
locker "modelRT/distributedlock"
_ "modelRT/docs"
"modelRT/handler"
"modelRT/logger"
"modelRT/middleware"
"modelRT/model"
"modelRT/pool"
"modelRT/router"
"modelRT/util"
realtimedata "modelRT/real-time-data"
"github.com/gin-gonic/gin"
"github.com/panjf2000/ants/v2"
swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
"gorm.io/gorm"
)
var limiter *middleware.Limiter
func init() {
limiter = middleware.NewLimiter(10, 1*time.Minute) // 设置限流器允许每分钟最多请求10次
}
var (
modelRTConfigDir = flag.String("modelRT_config_dir", "./configs", "config file dir of model runtime service")
modelRTConfigName = flag.String("modelRT_config_name", "config", "config file name of model runtime service")
modelRTConfigType = flag.String("modelRT_config_type", "yaml", "config file type of model runtime service")
)
var (
modelRTConfig config.ModelRTConfig
postgresDBClient *gorm.DB
alertManager *alert.EventManager
)
// TODO 使用 wire 依赖注入管理 DVIE 面板注册的 panel
// @title ModelRT 实时模型服务 API 文档
// @version 1.0
// @description 实时数据计算和模型运行服务的 API 服务
// TODO termsOfService服务条款待后续优化
// // @termsOfService http://swagger.io/terms/
//
// @contact.name douxu
// TODO 修改支持的文档地址
// @contact.url http://www.swagger.io/support
// @contact.email douxu@clea.com.cn
//
// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
//
// @host localhost:8080
// @BasePath /api/v1
func main() {
flag.Parse()
ctx := context.TODO()
// init logger
logger.InitLoggerInstance(modelRTConfig.LoggerConfig)
configPath := filepath.Join(*modelRTConfigDir, *modelRTConfigName+"."+*modelRTConfigType)
if _, err := os.Stat(configPath); os.IsNotExist(err) {
logger.Info(ctx, "configuration file not found,checking for example file")
exampleConfigPath := filepath.Join(*modelRTConfigDir, *modelRTConfigName+".example."+*modelRTConfigType)
if _, err := os.Stat(exampleConfigPath); err == nil {
if err := util.CopyFile(exampleConfigPath, configPath); err != nil {
logger.Error(ctx, "failed to copy example config file", "error", err)
panic(err)
}
logger.Info(ctx, "successfully copied example config to actual config file")
} else {
logger.Error(ctx, "No config file and no config example file found.")
}
}
modelRTConfig = config.ReadAndInitConfig(*modelRTConfigDir, *modelRTConfigName, *modelRTConfigType)
hostName, err := os.Hostname()
if err != nil {
logger.Error(ctx, "get host name failed", "error", err)
panic(err)
}
serviceToken, err := util.GenerateClientToken(hostName, modelRTConfig.ServiceConfig.ServiceName, modelRTConfig.ServiceConfig.SecretKey)
if err != nil {
logger.Error(ctx, "generate client token failed", "error", err)
panic(err)
}
// init postgresDBClient
postgresDBClient = database.InitPostgresDBInstance(ctx, modelRTConfig.PostgresDBURI)
defer func() {
sqlDB, err := postgresDBClient.DB()
if err != nil {
panic(err)
}
sqlDB.Close()
}()
// init alert manager
_ = alert.InitAlertEventManager()
// init model parse ants pool
parsePool, err := ants.NewPoolWithFunc(modelRTConfig.ParseConcurrentQuantity, pool.ParseFunc)
if err != nil {
logger.Error(ctx, "init concurrent parse task pool failed", "error", err)
panic(err)
}
defer parsePool.Release()
searchPool, err := util.NewRedigoPool(modelRTConfig.StorageRedisConfig)
defer searchPool.Close()
model.InitAutocompleterWithPool(searchPool)
storageClient := diagram.InitRedisClientInstance(modelRTConfig.StorageRedisConfig)
defer storageClient.Close()
lockerClient := locker.InitClientInstance(modelRTConfig.LockerRedisConfig)
defer lockerClient.Close()
// init anchor param ants pool
anchorRealTimePool, err := pool.AnchorPoolInit(modelRTConfig.RTDReceiveConcurrentQuantity)
if err != nil {
logger.Error(ctx, "init concurrent anchor param task pool failed", "error", err)
panic(err)
}
defer anchorRealTimePool.Release()
// init cancel context
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
// init real time data receive channel
go realtimedata.ReceiveChan(cancelCtx)
postgresDBClient.Transaction(func(tx *gorm.DB) error {
// load circuit diagram from postgres
// componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool)
// if err != nil {
// logger.Error(ctx, "load circuit diagrams from postgres failed", "error", err)
// panic(err)
// }
// TODO 暂时屏蔽完成 swagger 启动测试
tree, err := database.QueryTopologicFromDB(ctx, tx)
if err != nil {
logger.Error(ctx, "load topologic info from postgres failed", "error", err)
panic(err)
}
diagram.GlobalTree = tree
return nil
})
// TODO 完成订阅数据分析
// TODO 暂时屏蔽完成 swagger 启动测试
// go realtimedata.RealTimeDataComputer(ctx, nil, []string{}, "")
// use release mode in productio
// gin.SetMode(gin.ReleaseMode)
engine := gin.New()
router.RegisterRoutes(engine, serviceToken)
// real time data api
engine.GET("/ws/rtdatas", handler.RealTimeDataReceivehandler)
// anchor api
engine.POST("/model/anchor_replace", handler.ComponentAnchorReplaceHandler)
// alert api
engine.GET("/alert/events/query", handler.QueryAlertEventHandler)
// real time data api
engine.GET("/rt/datas/query", handler.QueryRealTimeDataHandler)
// dashborad api
dashboard := engine.Group("/dashboard", limiter.Middleware)
{
dashboard.GET("/load", nil)
dashboard.GET("/query", nil)
dashboard.POST("/create", nil)
dashboard.POST("/update", nil)
dashboard.POST("/delete", nil)
}
// Swagger UI
engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
// 注册 Swagger UI 路由
// docs.SwaggerInfo.BasePath = "/model"
// v1 := engine.Group("/api/v1")
// {
// eg := v1.Group("/example")
// {
// eg.GET("/helloworld", Helloworld)
// }
// }
server := http.Server{
Addr: ":8080",
Handler: engine,
}
// creating a System Signal Receiver
done := make(chan os.Signal, 10)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-done
if err := server.Shutdown(context.Background()); err != nil {
logger.Error(ctx, "ShutdownServerError", "err", err)
}
}()
logger.Info(ctx, "starting ModelRT server")
err = server.ListenAndServe()
if err != nil {
if err == http.ErrServerClosed {
// the service receives the shutdown signal normally and then closes
logger.Info(ctx, "Server closed under request")
} else {
// abnormal shutdown of service
logger.Error(ctx, "Server closed unexpected", "err", err)
}
}
}