From d0fe5f6982df1c39cb55e3c24526b9173c4a01bd Mon Sep 17 00:00:00 2001 From: douxu <921247973@qq.com> Date: Tue, 6 Aug 2024 17:03:41 +0800 Subject: [PATCH] feat:add standard generate template of influxdb query statment --- wave_record/config/config.yaml | 2 +- wave_record/database/influxdb_operator.go | 8 ++- wave_record/database/influxdb_para.go | 74 ++++++++++++++++++----- 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/wave_record/config/config.yaml b/wave_record/config/config.yaml index 592edb8..24fad7d 100644 --- a/wave_record/config/config.yaml +++ b/wave_record/config/config.yaml @@ -7,7 +7,7 @@ mongodb_database: "wave_record" influxdb_host: "localhost" influxdb_port: "8086" -influxdb_token: "" +influxdb_token: "lCuiQ316qlly3iFeoi1EUokPJ0XxW-5lnG-3rXsKaaZSjfuxO5EaZfFdrNGM7Zlrdk1PrN_7TOsM_SCu9Onyew==" influxdb_org: "coslight" influxdb_bucket: "wave_record" diff --git a/wave_record/database/influxdb_operator.go b/wave_record/database/influxdb_operator.go index 94b2ec4..d00a434 100644 --- a/wave_record/database/influxdb_operator.go +++ b/wave_record/database/influxdb_operator.go @@ -21,12 +21,14 @@ func WriterPointIntoInfluxDB(ctx context.Context, writeAPI api.WriteAPIBlocking, // QueryPointFromInfluxDB return the result of query from influxdb by special parameters // Get query client // queryAPI := client.QueryAPI(org) -func QueryPointFromInfluxDB(ctx context.Context, queryAPI api.QueryAPI, parameters InfluxDBPara, logger *zap.Logger) ([]interface{}, error) { +func QueryPointFromInfluxDB(ctx context.Context, queryAPI api.QueryAPI, logger *zap.Logger) ([]interface{}, error) { dataList := make([]interface{}, 0) // get query result - results, err := queryAPI.QueryWithParams(ctx, query, parameters) + // TODO 增加初始化示例 + query := InitQueryByPara() + results, err := queryAPI.Query(ctx, query) if err != nil { - logger.Error("query data by parameters failed", zap.Error(err), zap.String("para", parameters.String())) + logger.Error("query data by parameters failed", zap.Error(err)) return nil, err } diff --git a/wave_record/database/influxdb_para.go b/wave_record/database/influxdb_para.go index 32c9553..44e3cf3 100644 --- a/wave_record/database/influxdb_para.go +++ b/wave_record/database/influxdb_para.go @@ -1,23 +1,65 @@ // Package database define database operation functions package database -import "fmt" +import ( + "fmt" + "strings" +) -var query = `from(bucket:params.bucket) -|> range(start: duration(params.start)) -|> filter(fn: (r) => r._measurement == parames.measurement) -|> filter(fn: (r) => r._field == params.field) -|> filter(fn: (r) => r._value > params.value)` - -// InfluxDBPara define struct of influxdb query parameters -type InfluxDBPara struct { - Bucket string `json:"bucket"` - Measurement string `json:"measurement"` - Start string `json:"start"` - Field string `json:"field"` - Value float64 `json:"value"` +// QueryPara define struct of influxdb query parameters +type InfluxDBQueryPara struct { + Bucket string `json:"bucket"` + Measurement string `json:"measurement"` + Start string `json:"start"` + Stop string `json:"stop"` + Field string `json:"field"` + // Value float64 `json:"value"` } -func (i InfluxDBPara) String() string { - return fmt.Sprintf("bucket:%s, measurement:%s, start:%s, field:%s, value:%f", i.Bucket, i.Measurement, i.Start, i.Field, i.Value) +type QueryPara interface { + apply(string) string +} + +type BucketQueryPara string + +func (b BucketQueryPara) apply(query string) string { + template := " from(bucket:\"{bucketId}\") " + template = strings.Replace(template, "{bucketId}", string(b), -1) + return query + template +} + +func WithBucket(bucketID string) BucketQueryPara { + return BucketQueryPara(bucketID) +} + +type MeasurementQueryPara string + +func (m MeasurementQueryPara) apply(query string) string { + template := " |> filter(fn: (r) => r[\"_measurement\"] == \"{measurement}\") " + template = strings.Replace(template, "{measurement}", string(m), -1) + return query + template +} + +func WithMeasurement(measurement string) MeasurementQueryPara { + return MeasurementQueryPara(measurement) +} + +type StartTimeQueryPara string + +func (s StartTimeQueryPara) apply(query string) string { + template := " |> range(start: {start}) " + template = strings.Replace(template, "{start}", string(s), -1) + return query + template +} + +func WithStartTime(measurement string) StartTimeQueryPara { + return StartTimeQueryPara(measurement) +} + +func InitQueryByPara(paras ...QueryPara) (queryCmd string) { + for _, para := range paras { + queryCmd = para.apply(queryCmd) + } + fmt.Printf("queryCmd:%v\n", queryCmd) + return }