From eb5871b5884a42c3bfe236a12b6e1cea3f537c93 Mon Sep 17 00:00:00 2001 From: douxu <921247973@qq.com> Date: Wed, 31 Jul 2024 15:14:08 +0800 Subject: [PATCH] perf:add error constant configuration --- wave_record/comtrade/dir_monitor.go | 76 ------------- wave_record/comtrade/parse.go | 162 ---------------------------- wave_record/constant/errors.go | 3 + wave_record/util/file.go | 2 +- 4 files changed, 4 insertions(+), 239 deletions(-) delete mode 100644 wave_record/comtrade/dir_monitor.go delete mode 100644 wave_record/comtrade/parse.go diff --git a/wave_record/comtrade/dir_monitor.go b/wave_record/comtrade/dir_monitor.go deleted file mode 100644 index 3791477..0000000 --- a/wave_record/comtrade/dir_monitor.go +++ /dev/null @@ -1,76 +0,0 @@ -// Package comtrade define related functions for comtrade data processing -package comtrade - -import ( - "context" - "os" - "path/filepath" - "sync" - - "wave_record/constant" - - "github.com/fsnotify/fsnotify" - "github.com/panjf2000/ants/v2" - "go.uber.org/zap" -) - -// DirWatch define function for monitoring directories and passing add file name to channels -func DirWatch(monitorDir string, addChan chan string) { - logger := zap.L() - - watcher, err := fsnotify.NewWatcher() - if err != nil { - logger.Error("create dir watcher failed:", zap.Error(err)) - } - defer watcher.Close() - - err = watcher.Add(monitorDir) - if err != nil { - logger.Error("add monitor dir failed:", zap.Error(err)) - } - - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - - if event.Op&fsnotify.Create == fsnotify.Create { - logger.Info("file add in watcher dir", zap.String("file_name", event.Name)) - addChan <- event.Name - } - - if event.Op&fsnotify.Remove == fsnotify.Remove { - logger.Info("file remove in watcher dir", zap.String("file_name", event.Name)) - } - case err, ok := <-watcher.Errors: - if !ok { - logger.Error("monitor watcher error channel closed", zap.Bool("channel_status", ok)) - return - } - - logger.Error("monitor watcher error occurred", zap.Error(err)) - } - } -} - -// TraverseMonitorDir define function for traverse the files existing in the directory -func TraverseMonitorDir(ctx context.Context, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc) { - logger := zap.L() - err := filepath.Walk(monitorDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - logger.Error("accessing a path failed", zap.Error(err)) - return err - } - - fileSuffix := filepath.Ext(path) - if !info.IsDir() && fileSuffix == constant.ConfigFileSuffix { - processComtradeFile(ctx, path, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger) - } - return nil - }) - if err != nil { - logger.Error("traversing files that already exist in the monitor directory failed", zap.Error(err)) - } -} diff --git a/wave_record/comtrade/parse.go b/wave_record/comtrade/parse.go deleted file mode 100644 index 30fbb4d..0000000 --- a/wave_record/comtrade/parse.go +++ /dev/null @@ -1,162 +0,0 @@ -// Package comtrade define related functions for comtrade data processing -package comtrade - -import ( - "context" - "errors" - "fmt" - "path/filepath" - "strings" - "sync" - "time" - - "wave_record/config" - "wave_record/constant" - "wave_record/database" - "wave_record/util" - - "github.com/panjf2000/ants/v2" - "github.com/yonwoo9/go-comtrade" - "go.uber.org/zap" -) - -// ParseFunc define ants concourrent exec func -var ParseFunc = func(parseConfig interface{}) { - logger := zap.L() - comtradeStorageConfig, ok := parseConfig.(config.ComtradeDataStorageConfig) - if !ok { - // TODO 增加日志报错中输出文件名 - logger.Error("conversion parsing comtrade parameter type failed") - return - } - - comtradeData, err := parseComtradeData(comtradeStorageConfig.ConfigFilePath, comtradeStorageConfig.DataFilePath) - if err != nil { - // TODO 增加日志报错中输出文件名 - logger.Error("parsing comtrade data failed", zap.Error(err)) - return - } - - collection := database.MongoDBClient().Database(comtradeStorageConfig.DBName).Collection(comtradeData.Conf.StationName) - ctx, cancel := context.WithTimeout(comtradeStorageConfig.Ctx, 5*time.Second) - defer cancel() - - err = database. - StorageComtradeIntoMongoDB(ctx, comtradeData, collection, logger) - if err != nil { - // TODO 增加处理失败的文件名 - logger.Error("stroage comtrade data info mongoDB failed", zap.Error(err)) - } - comtradeStorageConfig.DelChan <- comtradeStorageConfig.ConfigFilePath - comtradeStorageConfig.DelChan <- comtradeStorageConfig.DataFilePath -} - -func parseComtradeData(configFilePath, dataFilePath string) (*comtrade.Comtrade, error) { - logger := zap.L() - - comtrade, err := comtrade.ParseComtrade(configFilePath, dataFilePath) - if err != nil { - logger.Error("parse comtrade file failed", zap.Error(err)) - return nil, err - } - return comtrade, nil -} - -// ParseComtradeFile define comtrade file parse func -func ParseComtradeFile(ctx context.Context, monitorDir string, dbName string, addChan chan string, delChan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) { - logger := zap.L() - - for { - addFilePath, ok := <-addChan - if !ok { - logger.Error("monitor add channel closed", zap.Bool("channel_status", ok)) - return - } - - err := processComtradeFile(ctx, addFilePath, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger) - if err != nil { - if errors.Is(err, constant.ErrNotFindDatFile) { - logger.Info("process comtrade file failed", zap.String("err", err.Error())) - continue - } - logger.Error("process comtrade file failed", zap.Error(err)) - } - } -} - -func processComtradeFile(ctx context.Context, addFilePath string, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc, logger *zap.Logger) error { - lastElement := filepath.Base(addFilePath) - fileName := strings.Split(lastElement, ".")[0] - fileExtension := filepath.Ext(lastElement) - // logger.Info("add comtrade file info", zap.String("file_path", addFilePath), - // zap.String("file_name", fileName), zap.String("file_extension", fileExtension)) - - switch fileExtension { - case constant.ConfigFileSuffix: - err := processConfigFile(ctx, dbName, addFilePath, delChan, comtradeMap, pool) - if err != nil { - return fmt.Errorf("process comtrade failed:%w", err) - } - case constant.DataFileSuffix: - err := processDataFile(monitorDir, fileName, addFilePath, comtradeMap, addChan) - if err != nil { - return fmt.Errorf("process comtrade failed:%w", err) - } - default: - logger.Warn("no support file style", zap.String("file_style", fileExtension)) - time.Sleep(5 * time.Second) - } - return nil -} - -func processConfigFile(ctx context.Context, dbName string, configFilePath string, delchan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) error { - fmt.Printf("configFilePath:%v\n", configFilePath) - dataFilePath, err := util.FileNameConvert(configFilePath) - fmt.Printf("dataFilePath:%v\n", dataFilePath) - fmt.Printf("err:%v", err) - exist := util.FileExists(dataFilePath) - if !exist { - comtradeMap.Store(configFilePath, "") - return constant.ErrNotFindDatFile - } - - pool.Invoke(config.ComtradeDataStorageConfig{ - Ctx: ctx, - DelChan: delchan, - DBName: dbName, - ConfigFilePath: configFilePath, - DataFilePath: dataFilePath, - }) - return nil -} - -func processDataFile(monitorDir string, fileName string, dataFilePath string, comtradeMap *sync.Map, addChan chan string) error { - configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "") - configFilePath := filepath.Join(monitorDir, configFileName) - exist := util.FileExists(configFilePath) - if exist { - comtradeMap.Store(configFilePath, dataFilePath) - addChan <- configFilePath - return nil - } - // addChan <- dataFilePath - return nil -} - -// MoveComtradeFile define comtrade file remove from comtradeMap and move comtrade file from monitor dir to backup dir func -func MoveComtradeFile(backupDir string, comtradeMap *sync.Map, delChan chan string) { - logger := zap.L() - for { - filePath := <-delChan - fileName := filepath.Base(filePath) - fileExtension := filepath.Ext(fileName) - if fileExtension == constant.ConfigFileSuffix { - comtradeMap.Delete(filePath) - } - backupFilePath := filepath.Join(backupDir, fileName) - err := util.RemoveFile(filePath, backupFilePath) - if err != nil { - logger.Error("remove file to backup dir failed", zap.Error(err)) - } - } -} diff --git a/wave_record/constant/errors.go b/wave_record/constant/errors.go index c983ce8..ed05a41 100644 --- a/wave_record/constant/errors.go +++ b/wave_record/constant/errors.go @@ -5,3 +5,6 @@ import "errors" // ErrNotFindDatFile define error for not find comtrade data file var ErrNotFindDatFile = errors.New("can not find dat file in map") + +// ErrNotSupportFileType define error for not support file type +var ErrNotSupportFileType = errors.New("not support file type") diff --git a/wave_record/util/file.go b/wave_record/util/file.go index 8d075eb..49f08d4 100644 --- a/wave_record/util/file.go +++ b/wave_record/util/file.go @@ -56,5 +56,5 @@ func FileNameConvert(filePath string) (string, error) { fileName := strings.Join([]string{name, constant.ConfigFileSuffix}, "") return filepath.Join(dirPath, fileName), nil } - return "", nil + return "", constant.ErrNotSupportFileType }