PowerEngine/wave_record/main.go

77 lines
1.6 KiB
Go
Raw Normal View History

package main
import (
"context"
"flag"
"fmt"
"sync"
"wave_record/comtrade"
"wave_record/config"
"wave_record/database"
"wave_record/log"
"wave_record/util"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
"github.com/panjf2000/ants/v2"
)
var (
comtradeConfigDir = flag.String("comtrade_config_dir", "./config", "config file dir of wave record project")
comtradeConfigName = flag.String("comtrade_config_name", "config", "config file name of wave record project")
comtradeConfigType = flag.String("comtrade_config_type", "yaml", "config file type of wave record project")
)
var (
waveRecordConfig config.WaveRecordConfig
addChannel chan string
comtradeMap sync.Map
mongoDBClient *mongo.Client
logger *zap.Logger
)
func init() {
addChannel = make(chan string, 10)
}
func main() {
flag.Parse()
ctx := context.TODO()
waveRecordConfig = config.ReadAndInitConfig(*comtradeConfigDir, *comtradeConfigName, *comtradeConfigType)
// init mongoDBClient
mongoDBClient = database.GetMongoDBInstance(ctx, waveRecordConfig.MongoDBURI)
// init logger
logger = log.GetLoggerInstance(waveRecordConfig.LCfg)
defer func() {
if err := mongoDBClient.Disconnect(ctx); err != nil {
panic(err)
}
}()
defer logger.Sync()
defer ants.Release()
pool, err := ants.NewPoolWithFunc(waveRecordConfig.ParseConcurrentQuantity, comtrade.ParseFunc)
if err != nil {
fmt.Println(err)
return
}
for i := 0; i < 15; i++ {
if err = pool.Invoke(i); err != nil {
fmt.Println(err)
}
}
go util.DirWatch(waveRecordConfig.MonitorDir, addChannel)
comtrade.ParseComtradeFile(waveRecordConfig.MonitorDir, addChannel, &comtradeMap)
}