feat:add query func and query parameter struct for influxdb
This commit is contained in:
parent
500eeaf339
commit
5e42854213
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
"github.com/influxdata/influxdb-client-go/v2/api"
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WriterPointIntoInfluxDB return the result of storing point into influxdb
|
// WriterPointIntoInfluxDB return the result of storing point into influxdb
|
||||||
|
|
@ -16,3 +17,30 @@ func WriterPointIntoInfluxDB(ctx context.Context, writeAPI api.WriteAPIBlocking,
|
||||||
point := influxdb2.NewPoint(measurement, tags, fields, time.Now())
|
point := influxdb2.NewPoint(measurement, tags, fields, time.Now())
|
||||||
return writeAPI.WritePoint(ctx, point)
|
return writeAPI.WritePoint(ctx, point)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryPointFromInfluxDB return the result of query from influxdb by special parameters
|
||||||
|
// Get query client
|
||||||
|
// queryAPI := client.QueryAPI(org)
|
||||||
|
func QueryPointFromInfluxDB(ctx context.Context, queryAPI api.QueryAPI, parameters InfluxDBPara, logger *zap.Logger) ([]interface{}, error) {
|
||||||
|
dataList := make([]interface{}, 0)
|
||||||
|
// get query result
|
||||||
|
results, err := queryAPI.QueryWithParams(ctx, query, parameters)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("query data by parameters failed", zap.Error(err), zap.String("para", parameters.String()))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for results.Next() {
|
||||||
|
if results.TableChanged() {
|
||||||
|
logger.Info("find new result table", zap.String("table_metadata", results.TableMetadata().String()))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dataList = append(dataList, results.Record().Value())
|
||||||
|
}
|
||||||
|
|
||||||
|
if results.Err() != nil {
|
||||||
|
logger.Error("query parsing error failed", zap.Error(results.Err()))
|
||||||
|
return nil, results.Err()
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,23 @@
|
||||||
|
// Package database define database operation functions
|
||||||
|
package database
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
|
var query = `from(bucket:params.bucket)
|
||||||
|
|> range(start: duration(params.start))
|
||||||
|
|> filter(fn: (r) => r._measurement == parames.measurement)
|
||||||
|
|> filter(fn: (r) => r._field == params.field)
|
||||||
|
|> filter(fn: (r) => r._value > params.value)`
|
||||||
|
|
||||||
|
// InfluxDBPara define struct of influxdb query parameters
|
||||||
|
type InfluxDBPara struct {
|
||||||
|
Bucket string `json:"bucket"`
|
||||||
|
Measurement string `json:"measurement"`
|
||||||
|
Start string `json:"start"`
|
||||||
|
Field string `json:"field"`
|
||||||
|
Value float64 `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i InfluxDBPara) String() string {
|
||||||
|
return fmt.Sprintf("bucket:%s, measurement:%s, start:%s, field:%s, value:%f", i.Bucket, i.Measurement, i.Start, i.Field, i.Value)
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue