perf:add error constant configuration
This commit is contained in:
parent
056093d1c4
commit
eb5871b588
|
|
@ -1,76 +0,0 @@
|
||||||
// Package comtrade define related functions for comtrade data processing
|
|
||||||
package comtrade
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"wave_record/constant"
|
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
|
||||||
"github.com/panjf2000/ants/v2"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DirWatch define function for monitoring directories and passing add file name to channels
|
|
||||||
func DirWatch(monitorDir string, addChan chan string) {
|
|
||||||
logger := zap.L()
|
|
||||||
|
|
||||||
watcher, err := fsnotify.NewWatcher()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("create dir watcher failed:", zap.Error(err))
|
|
||||||
}
|
|
||||||
defer watcher.Close()
|
|
||||||
|
|
||||||
err = watcher.Add(monitorDir)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("add monitor dir failed:", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event, ok := <-watcher.Events:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if event.Op&fsnotify.Create == fsnotify.Create {
|
|
||||||
logger.Info("file add in watcher dir", zap.String("file_name", event.Name))
|
|
||||||
addChan <- event.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
if event.Op&fsnotify.Remove == fsnotify.Remove {
|
|
||||||
logger.Info("file remove in watcher dir", zap.String("file_name", event.Name))
|
|
||||||
}
|
|
||||||
case err, ok := <-watcher.Errors:
|
|
||||||
if !ok {
|
|
||||||
logger.Error("monitor watcher error channel closed", zap.Bool("channel_status", ok))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Error("monitor watcher error occurred", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TraverseMonitorDir define function for traverse the files existing in the directory
|
|
||||||
func TraverseMonitorDir(ctx context.Context, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc) {
|
|
||||||
logger := zap.L()
|
|
||||||
err := filepath.Walk(monitorDir, func(path string, info os.FileInfo, err error) error {
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("accessing a path failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
fileSuffix := filepath.Ext(path)
|
|
||||||
if !info.IsDir() && fileSuffix == constant.ConfigFileSuffix {
|
|
||||||
processComtradeFile(ctx, path, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("traversing files that already exist in the monitor directory failed", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,162 +0,0 @@
|
||||||
// Package comtrade define related functions for comtrade data processing
|
|
||||||
package comtrade
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"wave_record/config"
|
|
||||||
"wave_record/constant"
|
|
||||||
"wave_record/database"
|
|
||||||
"wave_record/util"
|
|
||||||
|
|
||||||
"github.com/panjf2000/ants/v2"
|
|
||||||
"github.com/yonwoo9/go-comtrade"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ParseFunc define ants concourrent exec func
|
|
||||||
var ParseFunc = func(parseConfig interface{}) {
|
|
||||||
logger := zap.L()
|
|
||||||
comtradeStorageConfig, ok := parseConfig.(config.ComtradeDataStorageConfig)
|
|
||||||
if !ok {
|
|
||||||
// TODO 增加日志报错中输出文件名
|
|
||||||
logger.Error("conversion parsing comtrade parameter type failed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
comtradeData, err := parseComtradeData(comtradeStorageConfig.ConfigFilePath, comtradeStorageConfig.DataFilePath)
|
|
||||||
if err != nil {
|
|
||||||
// TODO 增加日志报错中输出文件名
|
|
||||||
logger.Error("parsing comtrade data failed", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
collection := database.MongoDBClient().Database(comtradeStorageConfig.DBName).Collection(comtradeData.Conf.StationName)
|
|
||||||
ctx, cancel := context.WithTimeout(comtradeStorageConfig.Ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
err = database.
|
|
||||||
StorageComtradeIntoMongoDB(ctx, comtradeData, collection, logger)
|
|
||||||
if err != nil {
|
|
||||||
// TODO 增加处理失败的文件名
|
|
||||||
logger.Error("stroage comtrade data info mongoDB failed", zap.Error(err))
|
|
||||||
}
|
|
||||||
comtradeStorageConfig.DelChan <- comtradeStorageConfig.ConfigFilePath
|
|
||||||
comtradeStorageConfig.DelChan <- comtradeStorageConfig.DataFilePath
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseComtradeData(configFilePath, dataFilePath string) (*comtrade.Comtrade, error) {
|
|
||||||
logger := zap.L()
|
|
||||||
|
|
||||||
comtrade, err := comtrade.ParseComtrade(configFilePath, dataFilePath)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("parse comtrade file failed", zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return comtrade, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ParseComtradeFile define comtrade file parse func
|
|
||||||
func ParseComtradeFile(ctx context.Context, monitorDir string, dbName string, addChan chan string, delChan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) {
|
|
||||||
logger := zap.L()
|
|
||||||
|
|
||||||
for {
|
|
||||||
addFilePath, ok := <-addChan
|
|
||||||
if !ok {
|
|
||||||
logger.Error("monitor add channel closed", zap.Bool("channel_status", ok))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err := processComtradeFile(ctx, addFilePath, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, constant.ErrNotFindDatFile) {
|
|
||||||
logger.Info("process comtrade file failed", zap.String("err", err.Error()))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
logger.Error("process comtrade file failed", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func processComtradeFile(ctx context.Context, addFilePath string, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc, logger *zap.Logger) error {
|
|
||||||
lastElement := filepath.Base(addFilePath)
|
|
||||||
fileName := strings.Split(lastElement, ".")[0]
|
|
||||||
fileExtension := filepath.Ext(lastElement)
|
|
||||||
// logger.Info("add comtrade file info", zap.String("file_path", addFilePath),
|
|
||||||
// zap.String("file_name", fileName), zap.String("file_extension", fileExtension))
|
|
||||||
|
|
||||||
switch fileExtension {
|
|
||||||
case constant.ConfigFileSuffix:
|
|
||||||
err := processConfigFile(ctx, dbName, addFilePath, delChan, comtradeMap, pool)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("process comtrade failed:%w", err)
|
|
||||||
}
|
|
||||||
case constant.DataFileSuffix:
|
|
||||||
err := processDataFile(monitorDir, fileName, addFilePath, comtradeMap, addChan)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("process comtrade failed:%w", err)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
logger.Warn("no support file style", zap.String("file_style", fileExtension))
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func processConfigFile(ctx context.Context, dbName string, configFilePath string, delchan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) error {
|
|
||||||
fmt.Printf("configFilePath:%v\n", configFilePath)
|
|
||||||
dataFilePath, err := util.FileNameConvert(configFilePath)
|
|
||||||
fmt.Printf("dataFilePath:%v\n", dataFilePath)
|
|
||||||
fmt.Printf("err:%v", err)
|
|
||||||
exist := util.FileExists(dataFilePath)
|
|
||||||
if !exist {
|
|
||||||
comtradeMap.Store(configFilePath, "")
|
|
||||||
return constant.ErrNotFindDatFile
|
|
||||||
}
|
|
||||||
|
|
||||||
pool.Invoke(config.ComtradeDataStorageConfig{
|
|
||||||
Ctx: ctx,
|
|
||||||
DelChan: delchan,
|
|
||||||
DBName: dbName,
|
|
||||||
ConfigFilePath: configFilePath,
|
|
||||||
DataFilePath: dataFilePath,
|
|
||||||
})
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func processDataFile(monitorDir string, fileName string, dataFilePath string, comtradeMap *sync.Map, addChan chan string) error {
|
|
||||||
configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "")
|
|
||||||
configFilePath := filepath.Join(monitorDir, configFileName)
|
|
||||||
exist := util.FileExists(configFilePath)
|
|
||||||
if exist {
|
|
||||||
comtradeMap.Store(configFilePath, dataFilePath)
|
|
||||||
addChan <- configFilePath
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// addChan <- dataFilePath
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MoveComtradeFile define comtrade file remove from comtradeMap and move comtrade file from monitor dir to backup dir func
|
|
||||||
func MoveComtradeFile(backupDir string, comtradeMap *sync.Map, delChan chan string) {
|
|
||||||
logger := zap.L()
|
|
||||||
for {
|
|
||||||
filePath := <-delChan
|
|
||||||
fileName := filepath.Base(filePath)
|
|
||||||
fileExtension := filepath.Ext(fileName)
|
|
||||||
if fileExtension == constant.ConfigFileSuffix {
|
|
||||||
comtradeMap.Delete(filePath)
|
|
||||||
}
|
|
||||||
backupFilePath := filepath.Join(backupDir, fileName)
|
|
||||||
err := util.RemoveFile(filePath, backupFilePath)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("remove file to backup dir failed", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -5,3 +5,6 @@ import "errors"
|
||||||
|
|
||||||
// ErrNotFindDatFile define error for not find comtrade data file
|
// ErrNotFindDatFile define error for not find comtrade data file
|
||||||
var ErrNotFindDatFile = errors.New("can not find dat file in map")
|
var ErrNotFindDatFile = errors.New("can not find dat file in map")
|
||||||
|
|
||||||
|
// ErrNotSupportFileType define error for not support file type
|
||||||
|
var ErrNotSupportFileType = errors.New("not support file type")
|
||||||
|
|
|
||||||
|
|
@ -56,5 +56,5 @@ func FileNameConvert(filePath string) (string, error) {
|
||||||
fileName := strings.Join([]string{name, constant.ConfigFileSuffix}, "")
|
fileName := strings.Join([]string{name, constant.ConfigFileSuffix}, "")
|
||||||
return filepath.Join(dirPath, fileName), nil
|
return filepath.Join(dirPath, fileName), nil
|
||||||
}
|
}
|
||||||
return "", nil
|
return "", constant.ErrNotSupportFileType
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue