perf:resolve the issue of multiple insertions caused by duplicate notifications

This commit is contained in:
douxu 2024-07-30 16:11:40 +08:00
parent 5f68e61ff8
commit 056093d1c4
4 changed files with 49 additions and 28 deletions

View File

@ -7,6 +7,8 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"wave_record/constant"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
@ -62,7 +64,8 @@ func TraverseMonitorDir(ctx context.Context, monitorDir string, dbName string, c
return err return err
} }
if !info.IsDir() { fileSuffix := filepath.Ext(path)
if !info.IsDir() && fileSuffix == constant.ConfigFileSuffix {
processComtradeFile(ctx, path, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger) processComtradeFile(ctx, path, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger)
} }
return nil return nil

View File

@ -76,7 +76,7 @@ func ParseComtradeFile(ctx context.Context, monitorDir string, dbName string, ad
err := processComtradeFile(ctx, addFilePath, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger) err := processComtradeFile(ctx, addFilePath, monitorDir, dbName, comtradeMap, addChan, delChan, pool, logger)
if err != nil { if err != nil {
if errors.Is(err, constant.ErrNotFindDatFile) { if errors.Is(err, constant.ErrNotFindDatFile) {
logger.Info("process comtrade file failed", zap.Error(err)) logger.Info("process comtrade file failed", zap.String("err", err.Error()))
continue continue
} }
logger.Error("process comtrade file failed", zap.Error(err)) logger.Error("process comtrade file failed", zap.Error(err))
@ -88,8 +88,8 @@ func processComtradeFile(ctx context.Context, addFilePath string, monitorDir str
lastElement := filepath.Base(addFilePath) lastElement := filepath.Base(addFilePath)
fileName := strings.Split(lastElement, ".")[0] fileName := strings.Split(lastElement, ".")[0]
fileExtension := filepath.Ext(lastElement) fileExtension := filepath.Ext(lastElement)
logger.Info("add comtrade file info", zap.String("file_path", addFilePath), // logger.Info("add comtrade file info", zap.String("file_path", addFilePath),
zap.String("file_name", fileName), zap.String("file_extension", fileExtension)) // zap.String("file_name", fileName), zap.String("file_extension", fileExtension))
switch fileExtension { switch fileExtension {
case constant.ConfigFileSuffix: case constant.ConfigFileSuffix:
@ -98,7 +98,7 @@ func processComtradeFile(ctx context.Context, addFilePath string, monitorDir str
return fmt.Errorf("process comtrade failed:%w", err) return fmt.Errorf("process comtrade failed:%w", err)
} }
case constant.DataFileSuffix: case constant.DataFileSuffix:
err := processDataFile(monitorDir, fileName, addFilePath, comtradeMap, addChan, logger) err := processDataFile(monitorDir, fileName, addFilePath, comtradeMap, addChan)
if err != nil { if err != nil {
return fmt.Errorf("process comtrade failed:%w", err) return fmt.Errorf("process comtrade failed:%w", err)
} }
@ -110,38 +110,36 @@ func processComtradeFile(ctx context.Context, addFilePath string, monitorDir str
} }
func processConfigFile(ctx context.Context, dbName string, configFilePath string, delchan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) error { func processConfigFile(ctx context.Context, dbName string, configFilePath string, delchan chan string, comtradeMap *sync.Map, pool *ants.PoolWithFunc) error {
dataFilePath, exist := comtradeMap.Load(configFilePath) fmt.Printf("configFilePath:%v\n", configFilePath)
if exist { dataFilePath, err := util.FileNameConvert(configFilePath)
if dataFilePath == "" { fmt.Printf("dataFilePath:%v\n", dataFilePath)
fmt.Println(11111) fmt.Printf("err:%v", err)
return constant.ErrNotFindDatFile exist := util.FileExists(dataFilePath)
} if !exist {
comtradeMap.Store(configFilePath, "")
pool.Invoke(config.ComtradeDataStorageConfig{ return constant.ErrNotFindDatFile
Ctx: ctx,
DelChan: delchan,
DBName: dbName,
ConfigFilePath: configFilePath,
DataFilePath: dataFilePath.(string),
})
return nil
} }
comtradeMap.Store(configFilePath, "")
fmt.Println(22222) pool.Invoke(config.ComtradeDataStorageConfig{
return constant.ErrNotFindDatFile Ctx: ctx,
DelChan: delchan,
DBName: dbName,
ConfigFilePath: configFilePath,
DataFilePath: dataFilePath,
})
return nil
} }
func processDataFile(monitorDir string, fileName string, dataFilePath string, comtradeMap *sync.Map, addChan chan string, logger *zap.Logger) error { func processDataFile(monitorDir string, fileName string, dataFilePath string, comtradeMap *sync.Map, addChan chan string) error {
configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "") configFileName := strings.Join([]string{fileName, constant.ConfigFileSuffix}, "")
configFilePath := filepath.Join(monitorDir, configFileName) configFilePath := filepath.Join(monitorDir, configFileName)
logger.Info("config path of comtrade file", zap.String("file_path", configFilePath))
exist := util.FileExists(configFilePath) exist := util.FileExists(configFilePath)
if exist { if exist {
comtradeMap.Store(configFilePath, dataFilePath) comtradeMap.Store(configFilePath, dataFilePath)
addChan <- configFilePath addChan <- configFilePath
return nil return nil
} }
addChan <- dataFilePath // addChan <- dataFilePath
return nil return nil
} }

View File

@ -62,11 +62,11 @@ func main() {
} }
defer ants.Release() defer ants.Release()
go comtrade.TraverseMonitorDir(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, &comtradeMap, addChan, delChan, pool)
go comtrade.DirWatch(waveRecordConfig.MonitorDir, addChan) go comtrade.DirWatch(waveRecordConfig.MonitorDir, addChan)
go comtrade.MoveComtradeFile(waveRecordConfig.BackupDir, &comtradeMap, delChan) go comtrade.MoveComtradeFile(waveRecordConfig.BackupDir, &comtradeMap, delChan)
comtrade.TraverseMonitorDir(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, &comtradeMap, addChan, delChan, pool)
comtrade.ParseComtradeFile(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, addChan, delChan, &comtradeMap, pool) comtrade.ParseComtradeFile(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, addChan, delChan, &comtradeMap, pool)
} }

View File

@ -8,6 +8,10 @@ import (
"io/fs" "io/fs"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"strings"
"wave_record/constant"
) )
// FileExists return boolean value result specifying whether the file path exists // FileExists return boolean value result specifying whether the file path exists
@ -38,3 +42,19 @@ func RemoveFile(src, dst string) error {
_, err := commandExec("mv", src, dst) _, err := commandExec("mv", src, dst)
return err return err
} }
// FileNameConvert return the absolute path of the converted config file or data file
func FileNameConvert(filePath string) (string, error) {
dirPath := filepath.Dir(filePath)
fileName := filepath.Base(filePath)
name := strings.Split(fileName, ".")[0]
suffix := filepath.Ext(fileName)
if suffix == constant.ConfigFileSuffix {
fileName := strings.Join([]string{name, constant.DataFileSuffix}, "")
return filepath.Join(dirPath, fileName), nil
} else if suffix == constant.DataFileSuffix {
fileName := strings.Join([]string{name, constant.ConfigFileSuffix}, "")
return filepath.Join(dirPath, fileName), nil
}
return "", nil
}