From 64ea68b47c0005cdd9e8779ae04649d4d6868fe8 Mon Sep 17 00:00:00 2001 From: douxu <921247973@qq.com> Date: Wed, 7 Aug 2024 16:26:24 +0800 Subject: [PATCH] feat:add influxdb query parameter interface and implementation --- wave_record/config/config.go | 2 +- wave_record/database/influxdb_para.go | 84 ++++++++++++++++++++++++--- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/wave_record/config/config.go b/wave_record/config/config.go index a551027..5928ca4 100644 --- a/wave_record/config/config.go +++ b/wave_record/config/config.go @@ -13,7 +13,7 @@ import ( "github.com/spf13/viper" ) -// WaveRecordConfig is designed for wave record config struct +// WaveRecordConfig define config of wave record struct type WaveRecordConfig struct { MonitorDir string BackupDir string diff --git a/wave_record/database/influxdb_para.go b/wave_record/database/influxdb_para.go index 44e3cf3..f7de424 100644 --- a/wave_record/database/influxdb_para.go +++ b/wave_record/database/influxdb_para.go @@ -6,7 +6,7 @@ import ( "strings" ) -// QueryPara define struct of influxdb query parameters +// InfluxDBQueryPara define struct of influxdb query parameters type InfluxDBQueryPara struct { Bucket string `json:"bucket"` Measurement string `json:"measurement"` @@ -16,10 +16,12 @@ type InfluxDBQueryPara struct { // Value float64 `json:"value"` } +// QueryPara define influxdb query parameters interface type QueryPara interface { apply(string) string } +// BucketQueryPara define influxdb bucket query parameter type BucketQueryPara string func (b BucketQueryPara) apply(query string) string { @@ -28,10 +30,12 @@ func (b BucketQueryPara) apply(query string) string { return query + template } +// WithBucket define func of set bucket parameter in query statment func WithBucket(bucketID string) BucketQueryPara { return BucketQueryPara(bucketID) } +// MeasurementQueryPara define influxdb measurement query parameter type MeasurementQueryPara string func (m MeasurementQueryPara) apply(query string) string { @@ -40,22 +44,88 @@ func (m MeasurementQueryPara) apply(query string) string { return query + template } +// WithMeasurement define func of set measurement parameter in query statment func WithMeasurement(measurement string) MeasurementQueryPara { return MeasurementQueryPara(measurement) } -type StartTimeQueryPara string +// RangeTimeQueryPara define influxdb range query parameter +type RangeTimeQueryPara struct { + StartTime string + StopTime string +} -func (s StartTimeQueryPara) apply(query string) string { - template := " |> range(start: {start}) " - template = strings.Replace(template, "{start}", string(s), -1) +func (r RangeTimeQueryPara) apply(query string) string { + template := " |> range(start: {start}, stop: {stop}) " + template = strings.Replace(template, "{start}", r.StartTime, -1) + template = strings.Replace(template, "{stop}", r.StopTime, -1) return query + template } -func WithStartTime(measurement string) StartTimeQueryPara { - return StartTimeQueryPara(measurement) +// WithRangeTime define func of set range parameter in query statment +func WithRangeTime(startTime, stopTime string) RangeTimeQueryPara { + return RangeTimeQueryPara{StartTime: startTime, StopTime: stopTime} } +// FieldQueryPara define influxdb field query parameter +type FieldQueryPara struct { + Fields []string + Operator string +} + +func (f FieldQueryPara) apply(query string) string { + fieldTemplate := "r[\"_field\"] == \"{value}\"" + filterTemplate := " |> filter(fn: (r) => {field_template}) " + fieldBuilder := strings.Builder{} + for index, field := range f.Fields { + if index > 0 { + operator := " " + f.Operator + " " + fieldBuilder.Write([]byte(operator)) + } + filedContent := strings.Replace(fieldTemplate, "{value}", field, -1) + fieldBuilder.Write([]byte(filedContent)) + } + filterTemplate = strings.Replace(filterTemplate, "{field_template}", fieldBuilder.String(), -1) + return query + filterTemplate +} + +// WithField define func of set field parameter in query statment +func WithField(fields []string, operator string) FieldQueryPara { + return FieldQueryPara{Fields: fields, Operator: operator} +} + +// TagQueryPara define influxdb tag query parameter +type TagQueryPara struct { + Tag []string + Value []string + Operator string +} + +func (t TagQueryPara) apply(query string) string { + tagTemplate := "r[\"{tag}\"] == \"{value}\"" + filterTemplate := " |> filter(fn: (r) => {tag_template}) " + tagBuilder := strings.Builder{} + for i := 0; i < len(t.Tag); i++ { + if i > 0 { + operator := " " + t.Operator + " " + tagBuilder.Write([]byte(operator)) + } + tagContent := strings.Replace(tagTemplate, "{tag}", t.Tag[i], -1) + tagContent = strings.Replace(tagContent, "{value}", t.Value[i], -1) + fmt.Printf("tagContent:%v\n", tagContent) + tagBuilder.Write([]byte(tagContent)) + } + + filterTemplate = strings.Replace(filterTemplate, "{tag_template}", tagBuilder.String(), -1) + return query + filterTemplate +} + +// WithTag define func of set tag parameter in query statment +func WithTag(tag []string, value []string, operator string) TagQueryPara { + return TagQueryPara{Tag: tag, Value: value, Operator: operator} +} + +// InitQueryByPara define func of init influxdb query statment func InitQueryByPara(paras ...QueryPara) (queryCmd string) { for _, para := range paras { queryCmd = para.apply(queryCmd)