// Package main define wave record project entry function package main import ( "context" "flag" "sync" "wave_record/config" "wave_record/database" "wave_record/fileparse" "wave_record/log" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/panjf2000/ants/v2" ) var ( comtradeConfigDir = flag.String("comtrade_config_dir", "./config", "config file dir of wave record project") comtradeConfigName = flag.String("comtrade_config_name", "config", "config file name of wave record project") comtradeConfigType = flag.String("comtrade_config_type", "yaml", "config file type of wave record project") ) var ( waveRecordConfig config.WaveRecordConfig addChan chan string delChan chan string comtradeMap sync.Map mongoDBClient *mongo.Client influxDBClient *influxdb2.Client logger *zap.Logger ) func init() { addChan = make(chan string, 10) delChan = make(chan string, 10) } func main() { flag.Parse() ctx := context.TODO() waveRecordConfig = config.ReadAndInitConfig(*comtradeConfigDir, *comtradeConfigName, *comtradeConfigType) // init mongoDBClient mongoDBClient = database.GetMongoDBInstance(ctx, waveRecordConfig.MongoDBURI) defer func() { if err := mongoDBClient.Disconnect(ctx); err != nil { panic(err) } }() // init influxDBClient influxDBClient = database.GetInfluxDBInstance(waveRecordConfig.InfluxDBURL, waveRecordConfig.InfluxDBToken) defer func() { client := *influxDBClient client.Close() }() // init logger logger = log.GetLoggerInstance(waveRecordConfig.LCfg) defer logger.Sync() pool, err := ants.NewPoolWithFunc(waveRecordConfig.ParseConcurrentQuantity, fileparse.ParseFunc) if err != nil { logger.Error("init concurrent parse task pool failed", zap.Error(err)) panic(err) } defer ants.Release() go fileparse.DirWatch(waveRecordConfig.MonitorDir, addChan) go fileparse.MoveComtradeFile(waveRecordConfig.BackupDir, &comtradeMap, delChan) fileparse.TraverseMonitorDir(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, &comtradeMap, addChan, delChan, pool) fileparse.ParseComtradeFile(ctx, waveRecordConfig.MonitorDir, waveRecordConfig.MongoDBDataBase, addChan, delChan, &comtradeMap, pool) }