feat:add processing for files that already exist in the monitoring directory

This commit is contained in:
douxu 2024-07-25 16:22:27 +08:00
parent bc06dedfc0
commit 3094ba679e
8 changed files with 216 additions and 78 deletions

View File

@ -1,10 +1,18 @@
package util
// Package comtrade define related functions for comtrade data processing
package comtrade
import (
"context"
"os"
"path/filepath"
"sync"
"github.com/fsnotify/fsnotify"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// DirWatch define function for monitoring directories and passing add file name to channels
func DirWatch(monitorDir string, addChan chan string) {
logger := zap.L()
@ -44,3 +52,23 @@ func DirWatch(monitorDir string, addChan chan string) {
}
}
}
// TraverseMonitorDir define function for traverse the files existing in the directory
func TraverseMonitorDir(ctx context.Context, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc) {
logger := zap.L()
err := filepath.Walk(monitorDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
logger.Error("accessing a path failed", zap.Error(err))
// TODO 嵌套error
return err
}
if !info.IsDir() {
processComtradeFile(ctx, path, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger)
}
return nil
})
if err != nil {
logger.Error("traversing files that already exist in the monitor directory failed", zap.Error(err))
}
}

View File

@ -1,6 +1,10 @@
// Package comtrade define related functions for comtrade data processing
package comtrade
import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"
@ -8,70 +12,43 @@ import (
"wave_record/config"
"wave_record/constant"
"wave_record/database"
"wave_record/util"
"github.com/panjf2000/ants/v2"
"github.com/yonwoo9/go-comtrade"
"go.uber.org/zap"
)
// ParseFunc define ants concourrent exec func
var ParseFunc = func(parseConfig interface{}) {
logger := zap.L()
comtradeConfig, ok := parseConfig.(config.ComtradeFileConfig)
comtradeStorageConfig, ok := parseConfig.(config.ComtradeDataStorageConfig)
if !ok {
// TODO 增加日志报错中输出文件名
logger.Error("conversion parsing comtrade parameter type failed")
return
}
parseComtradeData(comtradeConfig.ConfigFilePath, *&comtradeConfig.DatafilePath)
}
func ParseComtradeFile(monitorDir string, addChan chan string, comtradeMap *sync.Map) {
logger := zap.L()
for {
addFilePath, ok := <-addChan
if !ok {
logger.Error("monitor add channel closed", zap.Bool("channel_status", ok))
comtradeData, err := parseComtradeData(comtradeStorageConfig.ConfigFilePath, comtradeStorageConfig.DataFilePath)
if err != nil {
// TODO 增加日志报错中输出文件名
logger.Error("parsing comtrade data failed", zap.Error(err))
return
}
lastElement := filepath.Base(addFilePath)
fileName := strings.Split(lastElement, ".")[0]
fileExtension := filepath.Ext(lastElement)
logger.Info("add comtrade file info", zap.String("file_path", addFilePath),
zap.String("file_name", fileName), zap.String("file_extension", fileExtension))
collection := database.MongoDBClient().Database(comtradeStorageConfig.DBName).Collection(comtradeData.Conf.StationName)
ctx, cancel := context.WithTimeout(comtradeStorageConfig.Ctx, 5*time.Second)
defer cancel()
switch fileExtension {
case constant.ConfigFileSuffix:
configFilePath := addFilePath
dataFilePath, exist := comtradeMap.Load(addFilePath)
if exist {
if dataFilePath == "" {
continue
}
go parseComtradeData(configFilePath, dataFilePath.(string))
} else {
comtradeMap.Store(configFilePath, "")
}
case constant.DataFileSuffix:
configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "")
configFilePath := filepath.Join(monitorDir, configFileName)
logger.Info("config path of comtrade file", zap.String("file_path", configFilePath))
exist := util.FileExists(configFilePath)
if exist {
comtradeMap.Store(configFilePath, addFilePath)
addChan <- configFilePath
continue
}
addChan <- addFilePath
default:
logger.Warn("no support file style", zap.String("file_style", fileExtension))
time.Sleep(5 * time.Second)
}
err = database.
StorageComtradeIntoMongoDB(ctx, comtradeData, collection, logger)
if err != nil {
// TODO 增加处理失败的文件名
logger.Error("stroage comtrade data info mongoDB failed", zap.Error(err))
}
comtradeStorageConfig.DelChan <- comtradeStorageConfig.ConfigFilePath
comtradeStorageConfig.DelChan <- comtradeStorageConfig.DataFilePath
}
func parseComtradeData(configFilePath, dataFilePath string) (*comtrade.Comtrade, error) {
@ -84,3 +61,102 @@ func parseComtradeData(configFilePath, dataFilePath string) (*comtrade.Comtrade,
}
return comtrade, nil
}
// ParseComtradeFile define comtrade file parse func
func ParseComtradeFile(ctx context.Context, monitorDir string, dbName string, addChan chan string, delChan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) {
logger := zap.L()
for {
addFilePath, ok := <-addChan
if !ok {
logger.Error("monitor add channel closed", zap.Bool("channel_status", ok))
return
}
err := processComtradeFile(ctx, addFilePath, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger)
if errors.Is(err, nil) {
// TODO 写log error报警
logger.Error("process comtrade file failed", zap.Error(err))
continue
}
}
}
func processComtradeFile(ctx context.Context, addFilePath string, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc, logger *zap.Logger) error {
lastElement := filepath.Base(addFilePath)
fileName := strings.Split(lastElement, ".")[0]
fileExtension := filepath.Ext(lastElement)
logger.Info("add comtrade file info", zap.String("file_path", addFilePath),
zap.String("file_name", fileName), zap.String("file_extension", fileExtension))
switch fileExtension {
case constant.ConfigFileSuffix:
err := processConfigFile(ctx, dbName, addFilePath, delChan, comtradeMap, pool)
if err != nil {
// TODO 对error进行嵌套返回
return fmt.Errorf("%w", err)
}
case constant.DataFileSuffix:
err := processDataFile(monitorDir, fileName, addFilePath, comtradeMap, addChan, logger)
if err != nil {
// TODO 对error进行嵌套返回
return fmt.Errorf("%w", err)
}
default:
logger.Warn("no support file style", zap.String("file_style", fileExtension))
time.Sleep(5 * time.Second)
}
return nil
}
func processConfigFile(ctx context.Context, dbName string, configFilePath string, delchan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) error {
dataFilePath, exist := comtradeMap.Load(configFilePath)
if exist {
if dataFilePath == "" {
return errors.New("can not find dat file in map")
}
pool.Invoke(config.ComtradeDataStorageConfig{
Ctx: ctx,
DelChan: delchan,
DBName: dbName,
ConfigFilePath: configFilePath,
DataFilePath: dataFilePath.(string),
})
return nil
}
comtradeMap.Store(configFilePath, "")
return nil
}
func processDataFile(monitorDir string, fileName string, dataFilePath string, comtradeMap *sync.Map, addChan chan string, logger *zap.Logger) error {
configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "")
configFilePath := filepath.Join(monitorDir, configFileName)
logger.Info("config path of comtrade file", zap.String("file_path", configFilePath))
exist := util.FileExists(configFilePath)
if exist {
comtradeMap.Store(configFilePath, dataFilePath)
addChan <- configFilePath
return nil
}
addChan <- dataFilePath
return nil
}
// MoveComtradeFile define comtrade file remove from comtradeMap and move comtrade file from monitor dir to backup dir func
func MoveComtradeFile(backupDir string, comtradeMap *sync.Map, delChan chan string) {
logger := zap.L()
for {
filePath := <-delChan
fileName := filepath.Base(filePath)
fileExtension := filepath.Ext(fileName)
if fileExtension == constant.ConfigFileSuffix {
comtradeMap.Delete(filePath)
}
backupFilePath := filepath.Join(backupDir, fileName)
err := util.RemoveFile(filePath, backupFilePath)
if err != nil {
logger.Error("remove file to backup dir failed", zap.Error(err))
}
}
}

View File

@ -2,6 +2,7 @@
package config
import (
"context"
"strings"
"wave_record/log"
@ -12,16 +13,20 @@ import (
// WaveRecordConfig is designed for wave record config struct
type WaveRecordConfig struct {
MonitorDir string
BackupDir string
ParseConcurrentQuantity int // parse comtrade file concurrent quantity
MongoDBURI string
MongoDBDataBase string
LCfg log.LogConfig // log config
}
// ComtradeFileConfig define config struct of parse comtrade data
type ComtradeFileConfig struct {
// ComtradeDataStorageConfig define config struct of storage comtrade data
type ComtradeDataStorageConfig struct {
Ctx context.Context
DelChan chan string
DBName string
ConfigFilePath string
DatafilePath string
DataFilePath string
}
// ReadAndInitConfig return wave record project config struct
@ -38,6 +43,7 @@ func ReadAndInitConfig(configDir, configName, configType string) (waveRecordConf
}
waveRecordConfig.MonitorDir = config.GetString("comtrade_monitor_dir")
waveRecordConfig.BackupDir = config.GetString("comtrade_backup_dir")
// init mongodb config from config.yaml
mongoDBHost := config.GetString("mongodb_host")
mongoDBPort := config.GetString("mongodb_port")

View File

@ -1,11 +1,12 @@
comtrade_monitor_dir: "/tmp"
comtrade_monitor_dir: "/home/douxu/comtrade_file/"
comtrade_backup_dir: "/home/douxu/comtrade_file_backup/"
mongodb_host: "localhost"
mongodb_port: "27017"
mongodb_database: "wave_record"
log_level: "debug"
log_filename: "/log/wave_record.log"
log_filename: "/home/douxu/log/wave_record.log"
log_maxsize: 1
log_maxbackups: 5
log_maxage: 30

View File

@ -10,16 +10,25 @@ import (
)
var (
client *mongo.Client
once sync.Once
_globalClient *mongo.Client
_globalMu sync.RWMutex
)
// MongoDBClient returns the global mongoDB client.It's safe for concurrent use.
func MongoDBClient() *mongo.Client {
_globalMu.RLock()
client := _globalClient
_globalMu.RUnlock()
return client
}
// GetMongoDBInstance return instance of mongoDB client
func GetMongoDBInstance(ctx context.Context, mongoDBURI string) *mongo.Client {
once.Do(func() {
client = initMongoDBClient(ctx, mongoDBURI)
_globalClient = initMongoDBClient(ctx, mongoDBURI)
})
return client
return _globalClient
}
// initMongoDBClient return successfully initialized mongoDB client

View File

@ -2,7 +2,6 @@ package database
import (
"context"
"time"
"github.com/yonwoo9/go-comtrade"
"go.mongodb.org/mongo-driver/bson"
@ -11,11 +10,7 @@ import (
)
// StorageComtradeIntoMongoDB return the result of storing comtrade data into mongoDB
func StorageComtradeIntoMongoDB(ctx context.Context, comtradeData *comtrade.Comtrade, dbName string, client *mongo.Client, logger *zap.Logger) error {
collection := client.Database(dbName).Collection(comtradeData.Conf.StationName)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
func StorageComtradeIntoMongoDB(ctx context.Context, comtradeData *comtrade.Comtrade, collection *mongo.Collection, logger *zap.Logger) error {
comtradeBson, err := bson.Marshal(comtradeData)
if err != nil {
logger.Error("bson marshal comtrade data failed", zap.Error(err))

View File

@ -3,14 +3,12 @@ package main
import (
"context"
"flag"
"fmt"
"sync"
"wave_record/comtrade"
"wave_record/config"
"wave_record/database"
"wave_record/log"
"wave_record/util"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
@ -26,14 +24,16 @@ var (
var (
waveRecordConfig config.WaveRecordConfig
addChannel chan string
addChan chan string
delChan chan string
comtradeMap sync.Map
mongoDBClient *mongo.Client
logger *zap.Logger
)
func init() {
addChannel = make(chan string, 10)
addChan = make(chan string, 10)
delChan = make(chan string, 10)
}
func main() {
@ -60,17 +60,15 @@ func main() {
pool, err := ants.NewPoolWithFunc(waveRecordConfig.ParseConcurrentQuantity, comtrade.ParseFunc)
if err != nil {
fmt.Println(err)
return
logger.Error("init concurrent parse task pool failed", zap.Error(err))
panic(err)
}
for i := 0; i < 15; i++ {
if err = pool.Invoke(i); err != nil {
fmt.Println(err)
}
}
go comtrade.TraverseMonitorDir(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, &comtradeMap, addChan, delChan, pool)
go util.DirWatch(waveRecordConfig.MonitorDir, addChannel)
go comtrade.DirWatch(waveRecordConfig.MonitorDir, addChan)
comtrade.ParseComtradeFile(waveRecordConfig.MonitorDir, addChannel, &comtradeMap)
go comtrade.MoveComtradeFile(waveRecordConfig.BackupDir, &comtradeMap, delChan)
comtrade.ParseComtradeFile(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, addChan, delChan, &comtradeMap, pool)
}

View File

@ -2,9 +2,12 @@
package util
import (
"bytes"
"errors"
"fmt"
"io/fs"
"os"
"os/exec"
)
// FileExists return boolean value result specifying whether the file path exists
@ -13,3 +16,25 @@ func FileExists(filePath string) bool {
// return !os.IsNotExist(err)
return !errors.Is(err, fs.ErrNotExist)
}
func commandExec(name string, args ...string) (outString string, err error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command(name, args...)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err = cmd.Run()
outString = stdout.String()
if err != nil {
err = fmt.Errorf("exec failed:%v,stderr=%s. name=%s,args=%v", err, stderr.String(), name, args)
}
return
}
// RemoveFile return file movement result
func RemoveFile(src, dst string) error {
_, err := commandExec("mv", src, dst)
return err
}