PowerEngine/wave_record/main.go

157 lines
4.0 KiB
Go
Raw Normal View History

package main
import (
"flag"
"fmt"
"path/filepath"
"strings"
"sync"
"time"
"wave_record/constant"
"wave_record/util"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
"github.com/yonwoo9/go-comtrade"
"go.uber.org/zap"
)
var (
comtradeConfigDir = flag.String("comtrade_config_dir", "./config", "config file dir of wave record project")
comtradeConfigName = flag.String("comtrade_config_name", "config", "config file name of wave record project")
comtradeConfigType = flag.String("comtrade_config_type", "yaml", "config file type of wave record project")
)
var (
monitorDir string
addChannel chan string
comtradeMap sync.Map
logger *zap.Logger
)
func init() {
addChannel = make(chan string, 10)
logger, _ = zap.NewProduction()
}
func main() {
flag.Parse()
defer logger.Sync()
readConfig()
logger.Info("comtrade monitor Dir", zap.String("monitor_dir", monitorDir))
go dirWatch(monitorDir, addChannel)
parseComtradeFile(addChannel, comtradeMap)
}
func readConfig() {
config := viper.New()
config.AddConfigPath(*comtradeConfigDir)
config.SetConfigName(*comtradeConfigName)
config.SetConfigType(*comtradeConfigType)
if err := config.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
panic("can not find conifg file")
}
panic(err)
}
// get config content
monitorDir = config.GetString("comtrade_file_dir")
}
func dirWatch(monitorDir string, addChan chan string) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
logger.Error("create dir watcher failed:", zap.Error(err))
}
defer watcher.Close()
err = watcher.Add(monitorDir)
if err != nil {
logger.Error("add monitor dir failed:", zap.Error(err))
}
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Create == fsnotify.Create {
logger.Info("file add in watcher dir", zap.String("file_name", event.Name))
addChan <- event.Name
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
logger.Info("file remove in watcher dir", zap.String("file_name", event.Name))
}
case err, ok := <-watcher.Errors:
if !ok {
logger.Error("monitor watcher error channel closed", zap.Bool("channel_status", ok))
return
}
logger.Error("monitor watcher error occurred", zap.Error(err))
}
}
}
func parseComtradeFile(addChan chan string, comtradeMap sync.Map) {
for {
addFilePath, ok := <-addChan
if !ok {
logger.Error("monitor add channel closed", zap.Bool("channel_status", ok))
return
}
lastElement := filepath.Base(addFilePath)
fileName := strings.Split(lastElement, ".")[0]
fileExtension := filepath.Ext(lastElement)
logger.Info("add comtrade file info", zap.String("file_path", addFilePath),
zap.String("file_name", fileName), zap.String("file_extension", fileExtension))
switch fileExtension {
case constant.ConfigFileSuffix:
configFilePath := addFilePath
dataFilePath, exist := comtradeMap.Load(addFilePath)
if exist {
if dataFilePath == "" {
continue
}
go parseComtradeData(configFilePath, dataFilePath.(string))
} else {
comtradeMap.Store(configFilePath, "")
}
case constant.DataFileSuffix:
configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "")
configFilePath := filepath.Join(monitorDir, configFileName)
logger.Info("config path of comtrade file", zap.String("file_path", configFilePath))
exist := util.FileExists(configFilePath)
if exist {
comtradeMap.Store(configFilePath, addFilePath)
addChan <- configFilePath
continue
}
addChan <- addFilePath
default:
logger.Warn("no support file style", zap.String("file_style", fileExtension))
time.Sleep(5 * time.Second)
}
}
}
func parseComtradeData(configFilePath, dataFilePath string) {
c, err := comtrade.ParseComtrade(configFilePath, dataFilePath)
if err != nil {
logger.Error("parse comtrade file failed", zap.Error(err))
return
}
fmt.Println("parse succcess")
fmt.Println(c.Conf.Ft)
// TODO 增加文件存入mongodb
}