fix(outputs.redistimeseries): Handle string fields correctly (#14060)
This commit is contained in:
parent
7673624bcd
commit
68eda258f4
|
|
@ -297,6 +297,7 @@ following works:
|
|||
- github.com/prometheus/prometheus [Apache License 2.0](https://github.com/prometheus/prometheus/blob/master/LICENSE)
|
||||
- github.com/rabbitmq/amqp091-go [BSD 2-Clause "Simplified" License](https://github.com/rabbitmq/amqp091-go/blob/main/LICENSE)
|
||||
- github.com/rcrowley/go-metrics [BSD 2-Clause with views sentence](https://github.com/rcrowley/go-metrics/blob/master/LICENSE)
|
||||
- github.com/redis/go-redis [BSD 2-Clause "Simplified" License](https://github.com/redis/go-redis/blob/master/LICENSE)
|
||||
- github.com/remyoudompheng/bigfft [BSD 3-Clause "New" or "Revised" License](https://github.com/remyoudompheng/bigfft/blob/master/LICENSE)
|
||||
- github.com/riemann/riemann-go-client [MIT License](https://github.com/riemann/riemann-go-client/blob/master/LICENSE)
|
||||
- github.com/robbiet480/go.nut [MIT License](https://github.com/robbiet480/go.nut/blob/master/LICENSE)
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -155,6 +155,7 @@ require (
|
|||
github.com/prometheus/procfs v0.11.0
|
||||
github.com/prometheus/prometheus v0.46.0
|
||||
github.com/rabbitmq/amqp091-go v1.8.1
|
||||
github.com/redis/go-redis/v9 v9.2.1
|
||||
github.com/riemann/riemann-go-client v0.5.1-0.20211206220514-f58f10cdce16
|
||||
github.com/robbiet480/go.nut v0.0.0-20220219091450-bd8f121e1fa1
|
||||
github.com/robinson/gos7 v0.0.0-20230421131203-d20ac6ca08cd
|
||||
|
|
|
|||
6
go.sum
6
go.sum
|
|
@ -346,6 +346,10 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
|
|||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/boschrexroth/ctrlx-datalayer-golang v1.3.0 h1:rwOJNZEGwMGbKziTcGpcoMdK0lfZE78lxR+UzLw+pRM=
|
||||
github.com/boschrexroth/ctrlx-datalayer-golang v1.3.0/go.mod h1:i0ex6o3HhWHDSS0KEmRuHZOk3FVdJamzyk+tp3qmxkg=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/bufbuild/protocompile v0.6.0 h1:Uu7WiSQ6Yj9DbkdnOe7U4mNKp58y9WDMKDn28/ZlunY=
|
||||
github.com/bufbuild/protocompile v0.6.0/go.mod h1:YNP35qEYoYGme7QMtz5SBCoN4kL4g12jTtjuzRNdjpE=
|
||||
github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds=
|
||||
|
|
@ -1316,6 +1320,8 @@ github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0
|
|||
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg=
|
||||
github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
|
|
|
|||
|
|
@ -33,6 +33,14 @@ to use them.
|
|||
# password = ""
|
||||
# database = 0
|
||||
|
||||
## Timeout for operations such as ping or sending metrics
|
||||
# timeout = "10s"
|
||||
|
||||
## Enable attempt to convert string fields to numeric values
|
||||
## If "false" or in case the string value cannot be converted the string
|
||||
## field will be dropped.
|
||||
# convert_string_fields = true
|
||||
|
||||
## Optional TLS Config
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
|
|
|
|||
|
|
@ -2,14 +2,18 @@
|
|||
package redistimeseries
|
||||
|
||||
import (
|
||||
"context"
|
||||
_ "embed"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
)
|
||||
|
|
@ -18,11 +22,13 @@ import (
|
|||
var sampleConfig string
|
||||
|
||||
type RedisTimeSeries struct {
|
||||
Address string `toml:"address"`
|
||||
Username config.Secret `toml:"username"`
|
||||
Password config.Secret `toml:"password"`
|
||||
Database int `toml:"database"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
Address string `toml:"address"`
|
||||
Username config.Secret `toml:"username"`
|
||||
Password config.Secret `toml:"password"`
|
||||
Database int `toml:"database"`
|
||||
ConvertStringFields bool `toml:"convert_string_fields"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
tls.ClientConfig
|
||||
client *redis.Client
|
||||
}
|
||||
|
|
@ -50,7 +56,9 @@ func (r *RedisTimeSeries) Connect() error {
|
|||
Password: password.String(),
|
||||
DB: r.Database,
|
||||
})
|
||||
return r.client.Ping().Err()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
|
||||
defer cancel()
|
||||
return r.client.Ping(ctx).Err()
|
||||
}
|
||||
|
||||
func (r *RedisTimeSeries) Close() error {
|
||||
|
|
@ -65,23 +73,40 @@ func (r *RedisTimeSeries) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
func (r *RedisTimeSeries) Write(metrics []telegraf.Metric) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
|
||||
defer cancel()
|
||||
|
||||
for _, m := range metrics {
|
||||
now := m.Time().UnixNano() / 1000000 // in milliseconds
|
||||
name := m.Name()
|
||||
for name, fv := range m.Fields() {
|
||||
key := m.Name() + "_" + name
|
||||
|
||||
var tags []interface{}
|
||||
for k, v := range m.Tags() {
|
||||
tags = append(tags, k, v)
|
||||
}
|
||||
var value float64
|
||||
switch v := fv.(type) {
|
||||
case float64:
|
||||
value = v
|
||||
case string:
|
||||
if !r.ConvertStringFields {
|
||||
r.Log.Debugf("Dropping string field %q of metric %q", name, m.Name())
|
||||
continue
|
||||
}
|
||||
var err error
|
||||
value, err = strconv.ParseFloat(v, 64)
|
||||
if err != nil {
|
||||
r.Log.Debugf("Converting string field %q of metric %q failed: %v", name, m.Name(), err)
|
||||
continue
|
||||
}
|
||||
default:
|
||||
var err error
|
||||
value, err = internal.ToFloat64(v)
|
||||
if err != nil {
|
||||
r.Log.Errorf("Converting field %q (%T) of metric %q failed: %v", name, v, m.Name(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for fieldName, value := range m.Fields() {
|
||||
key := name + "_" + fieldName
|
||||
|
||||
addSlice := []interface{}{"TS.ADD", key, now, value}
|
||||
addSlice = append(addSlice, tags...)
|
||||
|
||||
if err := r.client.Do(addSlice...).Err(); err != nil {
|
||||
return fmt.Errorf("adding sample failed: %w", err)
|
||||
resp := r.client.TSAddWithArgs(ctx, key, m.Time().UnixMilli(), value, &redis.TSOptions{Labels: m.Tags()})
|
||||
if err := resp.Err(); err != nil {
|
||||
return fmt.Errorf("adding sample %q failed: %w", key, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -90,6 +115,9 @@ func (r *RedisTimeSeries) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
func init() {
|
||||
outputs.Add("redistimeseries", func() telegraf.Output {
|
||||
return &RedisTimeSeries{}
|
||||
return &RedisTimeSeries{
|
||||
ConvertStringFields: true,
|
||||
Timeout: config.Duration(10 * time.Second),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,35 +1,26 @@
|
|||
package redistimeseries
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/go-connections/nat"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
func TestConnectAndWrite(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
address := testutil.GetLocalHost() + ":6379"
|
||||
redis := &RedisTimeSeries{
|
||||
Address: address,
|
||||
}
|
||||
|
||||
// Verify that we can connect to the RedisTimeSeries server
|
||||
err := redis.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that we can successfully write data to the RedisTimeSeries server
|
||||
err = redis.Write(testutil.MockMetrics())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestConnectAndWriteIntegration(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
|
|
@ -43,10 +34,137 @@ func TestConnectAndWriteIntegration(t *testing.T) {
|
|||
require.NoError(t, container.Start(), "failed to start container")
|
||||
defer container.Terminate()
|
||||
redis := &RedisTimeSeries{
|
||||
Address: fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]),
|
||||
Address: fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]),
|
||||
ConvertStringFields: true,
|
||||
Timeout: config.Duration(10 * time.Second),
|
||||
}
|
||||
// Verify that we can connect to the RedisTimeSeries server
|
||||
require.NoError(t, redis.Connect())
|
||||
// Verify that we can successfully write data to the RedisTimeSeries server
|
||||
require.NoError(t, redis.Write(testutil.MockMetrics()))
|
||||
}
|
||||
|
||||
func TestCases(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
const servicePort = "6379"
|
||||
// Get all testcase directories
|
||||
folders, err := os.ReadDir("testcases")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Register the plugin
|
||||
outputs.Add("redistimeseries", func() telegraf.Output {
|
||||
return &RedisTimeSeries{
|
||||
ConvertStringFields: true,
|
||||
Timeout: config.Duration(10 * time.Second),
|
||||
}
|
||||
})
|
||||
|
||||
for _, f := range folders {
|
||||
// Only handle folders
|
||||
if !f.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
t.Run(f.Name(), func(t *testing.T) {
|
||||
testcasePath := filepath.Join("testcases", f.Name())
|
||||
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
||||
inputFilename := filepath.Join(testcasePath, "input.influx")
|
||||
expectedFilename := filepath.Join(testcasePath, "expected.out")
|
||||
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
|
||||
|
||||
// Get parser to parse input and expected output
|
||||
parser := &influx.Parser{}
|
||||
require.NoError(t, parser.Init())
|
||||
|
||||
// Load the input data
|
||||
input, err := testutil.ParseMetricsFromFile(inputFilename, parser)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Read the expected output if any
|
||||
var expected []string
|
||||
if _, err := os.Stat(expectedFilename); err == nil {
|
||||
expected, err = testutil.ParseLinesFromFile(expectedFilename)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Read the expected output if any
|
||||
var expectedError string
|
||||
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
||||
expectedErrors, err := testutil.ParseLinesFromFile(expectedErrorFilename)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, expectedErrors, 1)
|
||||
expectedError = expectedErrors[0]
|
||||
}
|
||||
|
||||
// Configure the plugin
|
||||
cfg := config.NewConfig()
|
||||
require.NoError(t, cfg.LoadConfig(configFilename))
|
||||
require.Len(t, cfg.Outputs, 1)
|
||||
|
||||
// Setup a test-container
|
||||
container := testutil.Container{
|
||||
Image: "redis/redis-stack-server:latest",
|
||||
ExposedPorts: []string{servicePort},
|
||||
Env: map[string]string{},
|
||||
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
|
||||
}
|
||||
require.NoError(t, container.Start(), "failed to start container")
|
||||
defer container.Terminate()
|
||||
|
||||
address := container.Address + ":" + container.Ports[servicePort]
|
||||
|
||||
// Setup the plugin
|
||||
plugin := cfg.Outputs[0].Output.(*RedisTimeSeries)
|
||||
plugin.Address = address
|
||||
plugin.Log = testutil.Logger{}
|
||||
|
||||
// Connect and write the metric(s)
|
||||
require.NoError(t, plugin.Connect())
|
||||
defer plugin.Close()
|
||||
|
||||
err = plugin.Write(input)
|
||||
if expectedError != "" {
|
||||
require.ErrorContains(t, err, expectedError)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
// // Check the metric nevertheless as we might get some metrics despite errors.
|
||||
actual := getAllRecords(address)
|
||||
require.ElementsMatch(t, expected, actual)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func getAllRecords(address string) []string {
|
||||
client := redis.NewClient(&redis.Options{Addr: address})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var records []string
|
||||
keys := client.Keys(ctx, "*")
|
||||
for _, key := range keys.Val() {
|
||||
info := client.TSInfo(ctx, key)
|
||||
var labels string
|
||||
if l, found := info.Val()["labels"]; found {
|
||||
lmap := l.(map[interface{}]interface{})
|
||||
collection := make([]string, 0, len(lmap))
|
||||
for k, v := range lmap {
|
||||
collection = append(collection, fmt.Sprintf("%v=%v", k, v))
|
||||
}
|
||||
if len(collection) > 0 {
|
||||
labels = " " + strings.Join(collection, " ")
|
||||
}
|
||||
}
|
||||
|
||||
result := client.TSRange(ctx, key, 0, int(time.Now().UnixMilli()))
|
||||
for _, point := range result.Val() {
|
||||
records = append(records, fmt.Sprintf("%s: %f %d%s", result.Args()[1], point.Value, point.Timestamp, labels))
|
||||
}
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,14 @@
|
|||
# password = ""
|
||||
# database = 0
|
||||
|
||||
## Timeout for operations such as ping or sending metrics
|
||||
# timeout = "10s"
|
||||
|
||||
## Enable attempt to convert string fields to numeric values
|
||||
## If "false" or in case the string value cannot be converted the string
|
||||
## field will be dropped.
|
||||
# convert_string_fields = true
|
||||
|
||||
## Optional TLS Config
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,6 @@
|
|||
weather_temperature: 23.100000 1696489223000 location=somewhere
|
||||
weather_humidity: 52.300000 1696489223000 location=somewhere
|
||||
weather_windspeed: 3.200000 1696489223000 location=somewhere
|
||||
weather_temperature: 23.200000 1696489223100 location=somewhere
|
||||
weather_humidity: 52.100000 1696489223100 location=somewhere
|
||||
weather_windspeed: 13.200000 1696489223100 location=somewhere
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
weather,location=somewhere temperature=23.1,humidity=52.3,windspeed=3.2 1696489223000000000
|
||||
weather,location=somewhere temperature=23.2,humidity=52.1,windspeed=13.2 1696489223100000000
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
[[outputs.redistimeseries]]
|
||||
address = "127.0.0.1:6379"
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
weather_temperature: 23.100000 1696489223000 location=somewhere
|
||||
weather_humidity: 52.300000 1696489223000 location=somewhere
|
||||
weather_windspeed: 3.200000 1696489223000 location=somewhere
|
||||
weather_temperature: 23.200000 1696489223100 location=somewhere
|
||||
weather_humidity: 52.100000 1696489223100 location=somewhere
|
||||
weather_windspeed: 13.200000 1696489223100 location=somewhere
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
weather,location=somewhere temperature=23.1,humidity=52.3,windspeed=3.2 1696489223000000000
|
||||
weather,location=somewhereelse temperature=23.2,humidity=52.1,windspeed=13.2 1696489223100000000
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
[[outputs.redistimeseries]]
|
||||
address = "127.0.0.1:6379"
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
example_operational: 1.000000 1696489223000
|
||||
example_hours: 10.000000 1696489223000
|
||||
example_value: 42.000000 1696489223000
|
||||
|
|
@ -0,0 +1 @@
|
|||
example value=42i,status="OK",hours="10",operational=true 1696489223000000000
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
[[outputs.redistimeseries]]
|
||||
address = "127.0.0.1:6379"
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
example_operational: 1.000000 1696489223000
|
||||
example_value: 42.000000 1696489223000
|
||||
|
|
@ -0,0 +1 @@
|
|||
example value=42i,status="OK",hours="10",operational=true 1696489223000000000
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
[[outputs.redistimeseries]]
|
||||
address = "127.0.0.1:6379"
|
||||
convert_string_fields = false
|
||||
Loading…
Reference in New Issue