diff --git a/wave_record/util/dir_monitor.go b/wave_record/comtrade/dir_monitor.go similarity index 50% rename from wave_record/util/dir_monitor.go rename to wave_record/comtrade/dir_monitor.go index 2b68324..365f1f5 100644 --- a/wave_record/util/dir_monitor.go +++ b/wave_record/comtrade/dir_monitor.go @@ -1,10 +1,18 @@ -package util +// Package comtrade define related functions for comtrade data processing +package comtrade import ( + "context" + "os" + "path/filepath" + "sync" + "github.com/fsnotify/fsnotify" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) +// DirWatch define function for monitoring directories and passing add file name to channels func DirWatch(monitorDir string, addChan chan string) { logger := zap.L() @@ -44,3 +52,23 @@ func DirWatch(monitorDir string, addChan chan string) { } } } + +// TraverseMonitorDir define function for traverse the files existing in the directory +func TraverseMonitorDir(ctx context.Context, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc) { + logger := zap.L() + err := filepath.Walk(monitorDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + logger.Error("accessing a path failed", zap.Error(err)) + // TODO 嵌套error + return err + } + + if !info.IsDir() { + processComtradeFile(ctx, path, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger) + } + return nil + }) + if err != nil { + logger.Error("traversing files that already exist in the monitor directory failed", zap.Error(err)) + } +} diff --git a/wave_record/comtrade/parse.go b/wave_record/comtrade/parse.go index eeb3518..408821d 100644 --- a/wave_record/comtrade/parse.go +++ b/wave_record/comtrade/parse.go @@ -1,6 +1,10 @@ +// Package comtrade define related functions for comtrade data processing package comtrade import ( + "context" + "errors" + "fmt" "path/filepath" "strings" "sync" @@ -8,70 +12,43 @@ import ( "wave_record/config" "wave_record/constant" + "wave_record/database" "wave_record/util" + "github.com/panjf2000/ants/v2" "github.com/yonwoo9/go-comtrade" "go.uber.org/zap" ) +// ParseFunc define ants concourrent exec func var ParseFunc = func(parseConfig interface{}) { logger := zap.L() - comtradeConfig, ok := parseConfig.(config.ComtradeFileConfig) + comtradeStorageConfig, ok := parseConfig.(config.ComtradeDataStorageConfig) if !ok { // TODO 增加日志报错中输出文件名 logger.Error("conversion parsing comtrade parameter type failed") return } - parseComtradeData(comtradeConfig.ConfigFilePath, *&comtradeConfig.DatafilePath) -} - -func ParseComtradeFile(monitorDir string, addChan chan string, comtradeMap *sync.Map) { - logger := zap.L() - - for { - addFilePath, ok := <-addChan - if !ok { - logger.Error("monitor add channel closed", zap.Bool("channel_status", ok)) - return - } - - lastElement := filepath.Base(addFilePath) - fileName := strings.Split(lastElement, ".")[0] - fileExtension := filepath.Ext(lastElement) - logger.Info("add comtrade file info", zap.String("file_path", addFilePath), - zap.String("file_name", fileName), zap.String("file_extension", fileExtension)) - - switch fileExtension { - case constant.ConfigFileSuffix: - configFilePath := addFilePath - - dataFilePath, exist := comtradeMap.Load(addFilePath) - if exist { - if dataFilePath == "" { - continue - } - - go parseComtradeData(configFilePath, dataFilePath.(string)) - } else { - comtradeMap.Store(configFilePath, "") - } - case constant.DataFileSuffix: - configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "") - configFilePath := filepath.Join(monitorDir, configFileName) - logger.Info("config path of comtrade file", zap.String("file_path", configFilePath)) - exist := util.FileExists(configFilePath) - if exist { - comtradeMap.Store(configFilePath, addFilePath) - addChan <- configFilePath - continue - } - addChan <- addFilePath - default: - logger.Warn("no support file style", zap.String("file_style", fileExtension)) - time.Sleep(5 * time.Second) - } + comtradeData, err := parseComtradeData(comtradeStorageConfig.ConfigFilePath, comtradeStorageConfig.DataFilePath) + if err != nil { + // TODO 增加日志报错中输出文件名 + logger.Error("parsing comtrade data failed", zap.Error(err)) + return } + + collection := database.MongoDBClient().Database(comtradeStorageConfig.DBName).Collection(comtradeData.Conf.StationName) + ctx, cancel := context.WithTimeout(comtradeStorageConfig.Ctx, 5*time.Second) + defer cancel() + + err = database. + StorageComtradeIntoMongoDB(ctx, comtradeData, collection, logger) + if err != nil { + // TODO 增加处理失败的文件名 + logger.Error("stroage comtrade data info mongoDB failed", zap.Error(err)) + } + comtradeStorageConfig.DelChan <- comtradeStorageConfig.ConfigFilePath + comtradeStorageConfig.DelChan <- comtradeStorageConfig.DataFilePath } func parseComtradeData(configFilePath, dataFilePath string) (*comtrade.Comtrade, error) { @@ -84,3 +61,102 @@ func parseComtradeData(configFilePath, dataFilePath string) (*comtrade.Comtrade, } return comtrade, nil } + +// ParseComtradeFile define comtrade file parse func +func ParseComtradeFile(ctx context.Context, monitorDir string, dbName string, addChan chan string, delChan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) { + logger := zap.L() + + for { + addFilePath, ok := <-addChan + if !ok { + logger.Error("monitor add channel closed", zap.Bool("channel_status", ok)) + return + } + + err := processComtradeFile(ctx, addFilePath, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger) + if errors.Is(err, nil) { + // TODO 写log error报警 + logger.Error("process comtrade file failed", zap.Error(err)) + continue + } + } +} + +func processComtradeFile(ctx context.Context, addFilePath string, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc, logger *zap.Logger) error { + lastElement := filepath.Base(addFilePath) + fileName := strings.Split(lastElement, ".")[0] + fileExtension := filepath.Ext(lastElement) + logger.Info("add comtrade file info", zap.String("file_path", addFilePath), + zap.String("file_name", fileName), zap.String("file_extension", fileExtension)) + + switch fileExtension { + case constant.ConfigFileSuffix: + err := processConfigFile(ctx, dbName, addFilePath, delChan, comtradeMap, pool) + if err != nil { + // TODO 对error进行嵌套返回 + return fmt.Errorf("%w", err) + } + case constant.DataFileSuffix: + err := processDataFile(monitorDir, fileName, addFilePath, comtradeMap, addChan, logger) + if err != nil { + // TODO 对error进行嵌套返回 + return fmt.Errorf("%w", err) + } + default: + logger.Warn("no support file style", zap.String("file_style", fileExtension)) + time.Sleep(5 * time.Second) + } + return nil +} + +func processConfigFile(ctx context.Context, dbName string, configFilePath string, delchan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) error { + dataFilePath, exist := comtradeMap.Load(configFilePath) + if exist { + if dataFilePath == "" { + return errors.New("can not find dat file in map") + } + + pool.Invoke(config.ComtradeDataStorageConfig{ + Ctx: ctx, + DelChan: delchan, + DBName: dbName, + ConfigFilePath: configFilePath, + DataFilePath: dataFilePath.(string), + }) + return nil + } + comtradeMap.Store(configFilePath, "") + return nil +} + +func processDataFile(monitorDir string, fileName string, dataFilePath string, comtradeMap *sync.Map, addChan chan string, logger *zap.Logger) error { + configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "") + configFilePath := filepath.Join(monitorDir, configFileName) + logger.Info("config path of comtrade file", zap.String("file_path", configFilePath)) + exist := util.FileExists(configFilePath) + if exist { + comtradeMap.Store(configFilePath, dataFilePath) + addChan <- configFilePath + return nil + } + addChan <- dataFilePath + return nil +} + +// MoveComtradeFile define comtrade file remove from comtradeMap and move comtrade file from monitor dir to backup dir func +func MoveComtradeFile(backupDir string, comtradeMap *sync.Map, delChan chan string) { + logger := zap.L() + for { + filePath := <-delChan + fileName := filepath.Base(filePath) + fileExtension := filepath.Ext(fileName) + if fileExtension == constant.ConfigFileSuffix { + comtradeMap.Delete(filePath) + } + backupFilePath := filepath.Join(backupDir, fileName) + err := util.RemoveFile(filePath, backupFilePath) + if err != nil { + logger.Error("remove file to backup dir failed", zap.Error(err)) + } + } +} diff --git a/wave_record/config/config.go b/wave_record/config/config.go index 34022c1..23bde0d 100644 --- a/wave_record/config/config.go +++ b/wave_record/config/config.go @@ -2,6 +2,7 @@ package config import ( + "context" "strings" "wave_record/log" @@ -12,16 +13,20 @@ import ( // WaveRecordConfig is designed for wave record config struct type WaveRecordConfig struct { MonitorDir string + BackupDir string ParseConcurrentQuantity int // parse comtrade file concurrent quantity MongoDBURI string MongoDBDataBase string LCfg log.LogConfig // log config } -// ComtradeFileConfig define config struct of parse comtrade data -type ComtradeFileConfig struct { +// ComtradeDataStorageConfig define config struct of storage comtrade data +type ComtradeDataStorageConfig struct { + Ctx context.Context + DelChan chan string + DBName string ConfigFilePath string - DatafilePath string + DataFilePath string } // ReadAndInitConfig return wave record project config struct @@ -38,6 +43,7 @@ func ReadAndInitConfig(configDir, configName, configType string) (waveRecordConf } waveRecordConfig.MonitorDir = config.GetString("comtrade_monitor_dir") + waveRecordConfig.BackupDir = config.GetString("comtrade_backup_dir") // init mongodb config from config.yaml mongoDBHost := config.GetString("mongodb_host") mongoDBPort := config.GetString("mongodb_port") diff --git a/wave_record/config/config.yaml b/wave_record/config/config.yaml index 2edc1c7..9bb576b 100644 --- a/wave_record/config/config.yaml +++ b/wave_record/config/config.yaml @@ -1,11 +1,12 @@ -comtrade_monitor_dir: "/tmp" +comtrade_monitor_dir: "/home/douxu/comtrade_file/" +comtrade_backup_dir: "/home/douxu/comtrade_file_backup/" mongodb_host: "localhost" mongodb_port: "27017" mongodb_database: "wave_record" log_level: "debug" -log_filename: "/log/wave_record.log" +log_filename: "/home/douxu/log/wave_record.log" log_maxsize: 1 log_maxbackups: 5 log_maxage: 30 diff --git a/wave_record/database/init.go b/wave_record/database/init.go index c25a92c..d8de788 100644 --- a/wave_record/database/init.go +++ b/wave_record/database/init.go @@ -10,16 +10,25 @@ import ( ) var ( - client *mongo.Client - once sync.Once + once sync.Once + _globalClient *mongo.Client + _globalMu sync.RWMutex ) +// MongoDBClient returns the global mongoDB client.It's safe for concurrent use. +func MongoDBClient() *mongo.Client { + _globalMu.RLock() + client := _globalClient + _globalMu.RUnlock() + return client +} + // GetMongoDBInstance return instance of mongoDB client func GetMongoDBInstance(ctx context.Context, mongoDBURI string) *mongo.Client { once.Do(func() { - client = initMongoDBClient(ctx, mongoDBURI) + _globalClient = initMongoDBClient(ctx, mongoDBURI) }) - return client + return _globalClient } // initMongoDBClient return successfully initialized mongoDB client diff --git a/wave_record/database/mongodb_store.go b/wave_record/database/mongodb_store.go index 1b342c1..c3d06b5 100644 --- a/wave_record/database/mongodb_store.go +++ b/wave_record/database/mongodb_store.go @@ -2,7 +2,6 @@ package database import ( "context" - "time" "github.com/yonwoo9/go-comtrade" "go.mongodb.org/mongo-driver/bson" @@ -11,11 +10,7 @@ import ( ) // StorageComtradeIntoMongoDB return the result of storing comtrade data into mongoDB -func StorageComtradeIntoMongoDB(ctx context.Context, comtradeData *comtrade.Comtrade, dbName string, client *mongo.Client, logger *zap.Logger) error { - collection := client.Database(dbName).Collection(comtradeData.Conf.StationName) - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - +func StorageComtradeIntoMongoDB(ctx context.Context, comtradeData *comtrade.Comtrade, collection *mongo.Collection, logger *zap.Logger) error { comtradeBson, err := bson.Marshal(comtradeData) if err != nil { logger.Error("bson marshal comtrade data failed", zap.Error(err)) diff --git a/wave_record/main.go b/wave_record/main.go index 64251f1..b56317f 100644 --- a/wave_record/main.go +++ b/wave_record/main.go @@ -3,14 +3,12 @@ package main import ( "context" "flag" - "fmt" "sync" "wave_record/comtrade" "wave_record/config" "wave_record/database" "wave_record/log" - "wave_record/util" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" @@ -26,14 +24,16 @@ var ( var ( waveRecordConfig config.WaveRecordConfig - addChannel chan string + addChan chan string + delChan chan string comtradeMap sync.Map mongoDBClient *mongo.Client logger *zap.Logger ) func init() { - addChannel = make(chan string, 10) + addChan = make(chan string, 10) + delChan = make(chan string, 10) } func main() { @@ -60,17 +60,15 @@ func main() { pool, err := ants.NewPoolWithFunc(waveRecordConfig.ParseConcurrentQuantity, comtrade.ParseFunc) if err != nil { - fmt.Println(err) - return + logger.Error("init concurrent parse task pool failed", zap.Error(err)) + panic(err) } - for i := 0; i < 15; i++ { - if err = pool.Invoke(i); err != nil { - fmt.Println(err) - } - } + go comtrade.TraverseMonitorDir(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, &comtradeMap, addChan, delChan, pool) - go util.DirWatch(waveRecordConfig.MonitorDir, addChannel) + go comtrade.DirWatch(waveRecordConfig.MonitorDir, addChan) - comtrade.ParseComtradeFile(waveRecordConfig.MonitorDir, addChannel, &comtradeMap) + go comtrade.MoveComtradeFile(waveRecordConfig.BackupDir, &comtradeMap, delChan) + + comtrade.ParseComtradeFile(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, addChan, delChan, &comtradeMap, pool) } diff --git a/wave_record/util/file.go b/wave_record/util/file.go index 8c34bbd..03e5d3f 100644 --- a/wave_record/util/file.go +++ b/wave_record/util/file.go @@ -2,9 +2,12 @@ package util import ( + "bytes" "errors" + "fmt" "io/fs" "os" + "os/exec" ) // FileExists return boolean value result specifying whether the file path exists @@ -13,3 +16,25 @@ func FileExists(filePath string) bool { // return !os.IsNotExist(err) return !errors.Is(err, fs.ErrNotExist) } + +func commandExec(name string, args ...string) (outString string, err error) { + var stdout bytes.Buffer + var stderr bytes.Buffer + + cmd := exec.Command(name, args...) + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err = cmd.Run() + outString = stdout.String() + if err != nil { + err = fmt.Errorf("exec failed:%v,stderr=%s. name=%s,args=%v", err, stderr.String(), name, args) + } + return +} + +// RemoveFile return file movement result +func RemoveFile(src, dst string) error { + _, err := commandExec("mv", src, dst) + return err +}