feat:add influxdb and mongodb operator funcs

This commit is contained in:
douxu 2024-08-01 15:57:28 +08:00
parent cf63e4a0ac
commit 500eeaf339
11 changed files with 181 additions and 43 deletions

View File

@ -20,6 +20,10 @@ type WaveRecordConfig struct {
ParseConcurrentQuantity int // parse comtrade file concurrent quantity
MongoDBURI string
MongoDBDataBase string
InfluxDBURL string
InfluxDBToken string
InfluxDBOrg string
InfluxDBBucket string
LCfg log.CutLogConfig // log config
}
@ -52,6 +56,13 @@ func ReadAndInitConfig(configDir, configName, configType string) (waveRecordConf
mongoDBPort := config.GetString("mongodb_port")
waveRecordConfig.MongoDBURI = strings.Join([]string{"mongodb://", mongoDBHost, ":", mongoDBPort}, "")
waveRecordConfig.MongoDBDataBase = config.GetString("mongodb_database")
// init influxdb config from config.yaml
influxDBHost := config.GetString("influxdb_host")
influxDBPort := config.GetString("influxdb_port")
waveRecordConfig.InfluxDBURL = strings.Join([]string{"http://", influxDBHost, ":", influxDBPort}, "")
waveRecordConfig.InfluxDBToken = config.GetString("influxdb_token")
waveRecordConfig.InfluxDBOrg = config.GetString("influxdb_org")
waveRecordConfig.InfluxDBBucket = config.GetString("influxdb_bucket")
// init zap log config from config.yaml
waveRecordConfig.LCfg.Mode = config.GetString("log_mode")
waveRecordConfig.LCfg.Level = config.GetString("log_level")

View File

@ -5,6 +5,12 @@ mongodb_host: "localhost"
mongodb_port: "27017"
mongodb_database: "wave_record"
influxdb_host: "localhost"
influxdb_port: "8086"
influxdb_token: ""
influxdb_org: "coslight"
influxdb_bucket: "wave_record"
log_mode: "development"
log_level: "debug"
log_filepath: "/home/douxu/log/wave_record-%s.log"

View File

@ -0,0 +1,36 @@
// Package database define database operation functions
package database
import (
"sync"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
var (
influxDBOnce sync.Once
_globalInfluxDBClient *influxdb2.Client
_globalInfluxDBMu sync.RWMutex
)
// InfluxDBClient returns the global InfluxDB client.It's safe for concurrent use.
func InfluxDBClient() *influxdb2.Client {
_globalInfluxDBMu.RLock()
client := _globalInfluxDBClient
_globalInfluxDBMu.RUnlock()
return client
}
// GetInfluxDBInstance return instance of InfluxDB client
func GetInfluxDBInstance(influxDBURL, token string) *influxdb2.Client {
mongoOnce.Do(func() {
_globalInfluxDBClient = initInfluxDBClient(influxDBURL, token)
})
return _globalInfluxDBClient
}
// initInfluxDBClient return successfully initialized InfluxDB client
func initInfluxDBClient(influxDBURL, token string) *influxdb2.Client {
influxDBClient := influxdb2.NewClient(influxDBURL, token)
return &influxDBClient
}

View File

@ -0,0 +1,18 @@
// Package database define database operation functions
package database
import (
"context"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
// WriterPointIntoInfluxDB return the result of storing point into influxdb
func WriterPointIntoInfluxDB(ctx context.Context, writeAPI api.WriteAPIBlocking, measurement string, tags map[string]string, fields map[string]interface{}) error {
// TODO 增加api创建示例
// aPI := client.WriteAPIBlocking("my-org", "my-bucket")
point := influxdb2.NewPoint(measurement, tags, fields, time.Now())
return writeAPI.WritePoint(ctx, point)
}

View File

@ -1,3 +1,4 @@
// Package database define database operation functions
package database
import (
@ -10,28 +11,28 @@ import (
)
var (
once sync.Once
_globalClient *mongo.Client
_globalMu sync.RWMutex
mongoOnce sync.Once
_globalMongoClient *mongo.Client
_globalMongoMu sync.RWMutex
)
// MongoDBClient returns the global mongoDB client.It's safe for concurrent use.
// MongoDBClient returns the global MongoDB client.It's safe for concurrent use.
func MongoDBClient() *mongo.Client {
_globalMu.RLock()
client := _globalClient
_globalMu.RUnlock()
_globalMongoMu.RLock()
client := _globalMongoClient
_globalMongoMu.RUnlock()
return client
}
// GetMongoDBInstance return instance of mongoDB client
// GetMongoDBInstance return instance of MongoDB client
func GetMongoDBInstance(ctx context.Context, mongoDBURI string) *mongo.Client {
once.Do(func() {
_globalClient = initMongoDBClient(ctx, mongoDBURI)
mongoOnce.Do(func() {
_globalMongoClient = initMongoDBClient(ctx, mongoDBURI)
})
return _globalClient
return _globalMongoClient
}
// initMongoDBClient return successfully initialized mongoDB client
// initMongoDBClient return successfully initialized MongoDB client
func initMongoDBClient(ctx context.Context, mongoDBURI string) *mongo.Client {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

View File

@ -0,0 +1,61 @@
// Package database define database operation functions
package database
import (
"context"
"fmt"
"wave_record/go-comtrade"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// StorageComtradeIntoMongoDB return the result of storing comtrade data into mongoDB
func StorageComtradeIntoMongoDB(ctx context.Context, comtradeData *comtrade.Comtrade, collection *mongo.Collection, logger *zap.Logger) error {
comtradeBson, err := bson.Marshal(comtradeData)
if err != nil {
logger.Error("bson marshal comtrade data failed", zap.Error(err))
return err
}
_, err = collection.InsertOne(ctx, comtradeBson)
if err != nil {
logger.Error("insert comtrade data into mongoDB failed", zap.Error(err))
return err
}
return nil
}
// TODO 增加mongodb查找示例
// var key, value string
// doc := bson.D{} // 创建一个空的bson.D文档
// // 添加键值对
// doc = append(doc, bson.E{Key: key, Value: value})
//
// FindComtradeIntoMongoDB return the query results of comtrade data with specified conditions in MongoDB
func FindComtradeIntoMongoDB(ctx context.Context, filter bson.D, collection *mongo.Collection, logger *zap.Logger) ([]*comtrade.Comtrade, error) {
var results []*comtrade.Comtrade
cur, err := collection.Find(ctx, filter, nil)
if err != nil {
logger.Error("")
}
for cur.Next(ctx) {
var element comtrade.Comtrade
err := cur.Decode(&element)
if err != nil {
fmt.Println(err)
}
results = append(results, &element)
}
if err := cur.Err(); err != nil {
fmt.Println(err)
}
cur.Close(ctx)
return results, nil
}

View File

@ -1,27 +0,0 @@
package database
import (
"context"
"wave_record/go-comtrade"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// StorageComtradeIntoMongoDB return the result of storing comtrade data into mongoDB
func StorageComtradeIntoMongoDB(ctx context.Context, comtradeData *comtrade.Comtrade, collection *mongo.Collection, logger *zap.Logger) error {
comtradeBson, err := bson.Marshal(comtradeData)
if err != nil {
logger.Error("bson marshal comtrade data failed", zap.Error(err))
return err
}
_, err = collection.InsertOne(ctx, comtradeBson)
if err != nil {
logger.Error("insert comtrade data into mongoDB failed", zap.Error(err))
return err
}
return nil
}

View File

@ -1,10 +1,12 @@
// Package main define comtrade file analyzing example Functions
package main
import (
"flag"
"fmt"
"server.baseware.net/CL-Softwares/go-comtrade"
"wave_record/go-comtrade"
// "server.baseware.net/CL-Softwares/go-comtrade"
)
var (

View File

@ -7,19 +7,24 @@ require (
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/panjf2000/ants/v2 v2.10.0
github.com/spf13/viper v1.19.0
github.com/yonwoo9/go-comtrade v0.0.3
// github.com/yonwoo9/go-comtrade v0.0.3
go.mongodb.org/mongo-driver v1.16.0
go.uber.org/zap v1.21.0
)
require (
github.com/BurntSushi/toml v1.4.0 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/oapi-codegen/runtime v1.0.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
@ -36,6 +41,7 @@ require (
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect

View File

@ -1,7 +1,11 @@
github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
@ -14,8 +18,15 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/influxdata/influxdb-client-go/v2 v2.13.0 h1:ioBbLmR5NMbAjP4UVA5r9b5xGjpABD7j65pI8kFphDM=
github.com/influxdata/influxdb-client-go/v2 v2.13.0/go.mod h1:k+spCbt9hcvqvUiz0sr5D8LolXHqAAOfPw9v/RIRHl4=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@ -33,6 +44,8 @@ github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo=
github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A=
github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1p+8=
github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
@ -59,6 +72,7 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI=
github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@ -79,8 +93,6 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yonwoo9/go-comtrade v0.0.3 h1:+Qbjn26MMXNacsGYdbYh4jNfMWiIdF8vRs6a0q/0xrU=
github.com/yonwoo9/go-comtrade v0.0.3/go.mod h1:FhyIfNGvvEfVXkCzzc5Xsp8HBtHK9AQxF0d2jkp5d4g=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
@ -113,6 +125,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

View File

@ -1,3 +1,4 @@
// Package main define wave record project entry function
package main
import (
@ -13,6 +14,7 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/panjf2000/ants/v2"
)
@ -28,6 +30,7 @@ var (
delChan chan string
comtradeMap sync.Map
mongoDBClient *mongo.Client
influxDBClient *influxdb2.Client
logger *zap.Logger
)
@ -51,6 +54,13 @@ func main() {
}
}()
// init influxDBClient
influxDBClient = database.GetInfluxDBInstance(waveRecordConfig.InfluxDBURL, waveRecordConfig.InfluxDBToken)
defer func() {
client := *influxDBClient
client.Close()
}()
// init logger
logger = log.GetLoggerInstance(waveRecordConfig.LCfg)
defer logger.Sync()