PowerEngine/wave_record/fileparse/parse.go

159 lines
5.1 KiB
Go

// Package fileparse define related functions for comtrade data processing
package fileparse
import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"
"time"
"wave_record/config"
"wave_record/constant"
"wave_record/database"
"wave_record/go-comtrade"
"wave_record/util"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// ParseFunc define ants concourrent exec func
var ParseFunc = func(parseConfig interface{}) {
logger := zap.L()
comtradeStorageConfig, ok := parseConfig.(config.ComtradeDataStorageConfig)
if !ok {
logger.Error("conversion comtrade parameter config type failed")
return
}
fileName := filepath.Base(comtradeStorageConfig.ConfigFilePath)
comtradeData, err := parseComtradeData(comtradeStorageConfig.ConfigFilePath, comtradeStorageConfig.DataFilePath)
if err != nil {
logger.Error("parsing comtrade data failed", zap.String("fileName", fileName), zap.Error(err))
return
}
collection := database.MongoDBClient().Database(comtradeStorageConfig.DBName).Collection(comtradeData.Conf.StationName)
ctx, cancel := context.WithTimeout(comtradeStorageConfig.Ctx, 5*time.Second)
defer cancel()
err = database.
StorageComtradeIntoMongoDB(ctx, comtradeData, collection, logger)
if err != nil {
logger.Error("stroage comtrade data info mongoDB failed", zap.String("fileName", fileName), zap.Error(err))
}
comtradeStorageConfig.DelChan <- comtradeStorageConfig.ConfigFilePath
comtradeStorageConfig.DelChan <- comtradeStorageConfig.DataFilePath
}
func parseComtradeData(configFilePath, dataFilePath string) (*comtrade.Comtrade, error) {
logger := zap.L()
comtrade, err := comtrade.ParseComtrade(configFilePath, dataFilePath)
if err != nil {
logger.Error("parse comtrade file failed", zap.Error(err))
return nil, err
}
return comtrade, nil
}
// ParseComtradeFile define comtrade file parse func
func ParseComtradeFile(ctx context.Context, monitorDir string, dbName string, addChan chan string, delChan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) {
logger := zap.L()
for {
addFilePath, ok := <-addChan
if !ok {
logger.Error("monitor add channel closed", zap.Bool("channel_status", ok))
return
}
err := processComtradeFile(ctx, addFilePath, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger)
if err != nil {
if errors.Is(err, constant.ErrNotFindDatFile) {
logger.Info("process comtrade file failed", zap.String("err", err.Error()))
continue
}
logger.Error("process comtrade file failed", zap.Error(err))
}
}
}
func processComtradeFile(ctx context.Context, addFilePath string, monitorDir string, dbName string, comtradeMap *sync.Map, addChan chan string, delChan chan string, pool *ants.PoolWithFunc, logger *zap.Logger) error {
lastElement := filepath.Base(addFilePath)
fileName := strings.Split(lastElement, ".")[0]
fileExtension := filepath.Ext(lastElement)
switch fileExtension {
case constant.ConfigFileSuffix:
err := processConfigFile(ctx, dbName, addFilePath, delChan, comtradeMap, pool)
if err != nil {
return fmt.Errorf("process comtrade failed:%w", err)
}
case constant.DataFileSuffix:
err := processDataFile(monitorDir, fileName, addFilePath, comtradeMap, addChan)
if err != nil {
return fmt.Errorf("process comtrade failed:%w", err)
}
default:
logger.Warn("no support file style", zap.String("file_style", fileExtension))
time.Sleep(5 * time.Second)
}
return nil
}
func processConfigFile(ctx context.Context, dbName string, configFilePath string, delchan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) error {
dataFilePath, err := util.FileNameConvert(configFilePath)
if err != nil {
return fmt.Errorf("process config file failed:%w", err)
}
exist := util.FileExists(dataFilePath)
if !exist {
comtradeMap.Store(configFilePath, "")
return constant.ErrNotFindDatFile
}
pool.Invoke(config.ComtradeDataStorageConfig{
Ctx: ctx,
DelChan: delchan,
DBName: dbName,
ConfigFilePath: configFilePath,
DataFilePath: dataFilePath,
})
return nil
}
func processDataFile(monitorDir string, fileName string, dataFilePath string, comtradeMap *sync.Map, addChan chan string) error {
configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "")
configFilePath := filepath.Join(monitorDir, configFileName)
exist := util.FileExists(configFilePath)
if exist {
comtradeMap.Store(configFilePath, dataFilePath)
addChan <- configFilePath
return nil
}
// addChan <- dataFilePath
return nil
}
// MoveComtradeFile define comtrade file remove from comtradeMap and move comtrade file from monitor dir to backup dir func
func MoveComtradeFile(backupDir string, comtradeMap *sync.Map, delChan chan string) {
logger := zap.L()
for {
filePath := <-delChan
fileName := filepath.Base(filePath)
fileExtension := filepath.Ext(fileName)
if fileExtension == constant.ConfigFileSuffix {
comtradeMap.Delete(filePath)
}
backupFilePath := filepath.Join(backupDir, fileName)
err := util.RemoveFile(filePath, backupFilePath)
if err != nil {
logger.Error("remove file to backup dir failed", zap.Error(err))
}
}
}