feat(parsers.opentsdb): Add OpenTSDB data format parser (#13167)

This commit is contained in:
Nick Thomas 2023-05-05 14:14:51 +01:00 committed by GitHub
parent 66fa382f2c
commit a808b9f077
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 466 additions and 2 deletions

View File

@ -17,6 +17,7 @@ Protocol, JSON format, or Apache Avro format.
- [JSON v2](/plugins/parsers/json_v2)
- [Logfmt](/plugins/parsers/logfmt)
- [Nagios](/plugins/parsers/nagios)
- [OpenTSDB](/plugins/parsers/opentsdb)
- [Prometheus](/plugins/parsers/prometheus)
- [PrometheusRemoteWrite](/plugins/parsers/prometheusremotewrite)
- [Value](/plugins/parsers/value), ie: 45 or "booyah"

View File

@ -0,0 +1,5 @@
//go:build !custom || parsers || parsers.opentsdb
package all
import _ "github.com/influxdata/telegraf/plugins/parsers/opentsdb" // register plugin

View File

@ -0,0 +1,29 @@
# OpenTSDB Telnet Style Put Format Parser Plugin
The `OpenTSDB` data format parses data in OpenTSDB's Telnet style put API
format. There are no additional configuration options for OpenTSDB. The metrics
are parsed directly into Telegraf metrics.
For more detail on the format, see:
- [OpenTSDB Telnet "PUT" API guide](http://opentsdb.net/docs/build/html/api_telnet/put.html)
- [OpenTSDB data specification](http://opentsdb.net/docs/build/html/user_guide/writing/index.html#data-specification)
## Configuration
```toml
[[inputs.file]]
files = ["example"]
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "opentsdb"
```
## Example
```opentsdb
put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0
```

View File

@ -0,0 +1,121 @@
package opentsdb
import (
"bufio"
"bytes"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)
// Parser encapsulates a OpenTSDB Parser.
type Parser struct {
DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"`
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
scanner := bufio.NewScanner(bytes.NewReader(buf))
for scanner.Scan() {
line := scanner.Text()
// delete LF and CR
line = strings.TrimRight(line, "\r\n")
m, err := p.ParseLine(line)
if err != nil {
p.Log.Errorf("Error parsing %q as opentsdb: %s", line, err)
// Don't let one bad line spoil a whole batch. In particular, it may
// be a valid opentsdb telnet protocol command, like "version", that
// we don't support.
continue
}
metrics = append(metrics, m)
}
if err := scanner.Err(); err != nil {
return nil, err
}
return metrics, nil
}
// ParseLine performs OpenTSDB parsing of a single line.
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
// Break into fields ("put", name, timestamp, value, tag1, tag2, ..., tagN).
fields := strings.Fields(line)
if len(fields) < 4 || fields[0] != "put" {
return nil, errors.New("doesn't have required fields")
}
// decode the name and tags
measurement := fields[1]
tsStr := fields[2]
valueStr := fields[3]
tagStrs := fields[4:]
// Parse value.
v, err := strconv.ParseFloat(valueStr, 64)
if err != nil {
return nil, fmt.Errorf("parsing field %q value failed: %w", measurement, err)
}
fieldValues := map[string]interface{}{"value": v}
// Parse timestamp.
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing field %q time failed: %w", measurement, err)
}
var timestamp time.Time
if ts < 1e12 {
// second resolution
timestamp = time.Unix(ts, 0)
} else {
// millisecond resolution
timestamp = time.UnixMilli(ts)
}
tags := make(map[string]string, len(p.DefaultTags)+len(tagStrs))
for k, v := range p.DefaultTags {
tags[k] = v
}
for _, tag := range tagStrs {
tagValue := strings.Split(tag, "=")
if len(tagValue) != 2 {
continue
}
name := tagValue[0]
value := tagValue[1]
if name == "" || value == "" {
continue
}
tags[name] = value
}
return metric.New(measurement, tags, fieldValues, timestamp), nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func init() {
parsers.Add("opentsdb",
func(defaultMetricName string) telegraf.Parser {
return &Parser{}
})
}

View File

@ -0,0 +1,308 @@
package opentsdb
import (
"strconv"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestParseLine(t *testing.T) {
testTime := time.Now()
testTimeSec := testTime.Round(time.Second)
testTimeMilli := testTime.Round(time.Millisecond)
strTimeSec := strconv.FormatInt(testTimeSec.Unix(), 10)
strTimeMilli := strconv.FormatInt(testTimeMilli.UnixNano()/1000000, 10)
var tests = []struct {
name string
input string
expected telegraf.Metric
}{
{
name: "minimal case",
input: "put sys.cpu.user " + strTimeSec + " 50",
expected: testutil.MustMetric(
"sys.cpu.user",
map[string]string{},
map[string]interface{}{
"value": float64(50),
},
testTimeSec,
),
},
{
name: "millisecond timestamp",
input: "put sys.cpu.user " + strTimeMilli + " 50",
expected: testutil.MustMetric(
"sys.cpu.user",
map[string]string{},
map[string]interface{}{
"value": float64(50),
},
testTimeMilli,
),
},
{
name: "floating point value",
input: "put sys.cpu.user " + strTimeSec + " 42.5",
expected: testutil.MustMetric(
"sys.cpu.user",
map[string]string{},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
},
{
name: "single tag",
input: "put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01",
expected: testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver01",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
},
{
name: "double tags",
input: "put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01 cpu=7",
expected: testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver01",
"cpu": "7",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &Parser{Log: testutil.Logger{}}
actual, err := p.ParseLine(tt.input)
require.NoError(t, err)
testutil.RequireMetricEqual(t, tt.expected, actual)
})
}
}
func TestParse(t *testing.T) {
testTime := time.Now()
testTimeSec := testTime.Round(time.Second)
strTimeSec := strconv.FormatInt(testTimeSec.Unix(), 10)
var tests = []struct {
name string
input []byte
expected []telegraf.Metric
}{
{
name: "single line with no newline",
input: []byte("put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01 cpu=7"),
expected: []telegraf.Metric{
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver01",
"cpu": "7",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
},
},
{
name: "single line with LF",
input: []byte("put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01 cpu=7\n"),
expected: []telegraf.Metric{
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver01",
"cpu": "7",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
},
},
{
name: "single line with CR+LF",
input: []byte("put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01 cpu=7\r\n"),
expected: []telegraf.Metric{
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver01",
"cpu": "7",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
},
},
{
name: "double lines",
input: []byte("put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01 cpu=7\r\n" +
"put sys.cpu.user " + strTimeSec + " 53.5 host=webserver02 cpu=3\r\n"),
expected: []telegraf.Metric{
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver01",
"cpu": "7",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver02",
"cpu": "3",
},
map[string]interface{}{
"value": float64(53.5),
},
testTimeSec,
),
},
},
{
name: "mixed valid/invalid input",
input: []byte(
"version\r\n" +
"put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01 cpu=7\r\n" +
"put sys.cpu.user " + strTimeSec + " 53.5 host=webserver02 cpu=3\r\n",
),
expected: []telegraf.Metric{
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver01",
"cpu": "7",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"host": "webserver02",
"cpu": "3",
},
map[string]interface{}{
"value": float64(53.5),
},
testTimeSec,
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &Parser{Log: testutil.Logger{}}
actual, err := p.Parse(tt.input)
require.NoError(t, err)
testutil.RequireMetricsEqual(t, tt.expected, actual)
})
}
}
func TestParse_DefaultTags(t *testing.T) {
testTime := time.Now()
testTimeSec := testTime.Round(time.Second)
strTimeSec := strconv.FormatInt(testTimeSec.Unix(), 10)
var tests = []struct {
name string
input []byte
defaultTags map[string]string
expected []telegraf.Metric
}{
{
name: "single default tag",
input: []byte("put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01 cpu=7"),
defaultTags: map[string]string{
"foo": "bar",
},
expected: []telegraf.Metric{
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"foo": "bar",
"host": "webserver01",
"cpu": "7",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
},
},
{
name: "double default tags",
input: []byte("put sys.cpu.user " + strTimeSec + " 42.5 host=webserver01 cpu=7"),
defaultTags: map[string]string{
"foo1": "bar1",
"foo2": "bar2",
},
expected: []telegraf.Metric{
testutil.MustMetric(
"sys.cpu.user",
map[string]string{
"foo1": "bar1",
"foo2": "bar2",
"host": "webserver01",
"cpu": "7",
},
map[string]interface{}{
"value": float64(42.5),
},
testTimeSec,
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &Parser{Log: testutil.Logger{}}
p.SetDefaultTags(tt.defaultTags)
actual, err := p.Parse(tt.input)
require.NoError(t, err)
testutil.RequireMetricsEqual(t, tt.expected, actual)
})
}
}

View File

@ -68,7 +68,7 @@ type ParserCompatibility interface {
// Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers.
type Config struct {
// DataFormat can be one of: avro, json, influx, graphite, value, nagios
// DataFormat can be one of: avro, json, influx, graphite, value, nagios, opentsdb
DataFormat string `toml:"data_format"`
// Separator only applied to Graphite data.

View File

@ -45,7 +45,7 @@ func TestRegistry_BackwardCompatibility(t *testing.T) {
}
// Define parsers that do not have an old-school init
newStyleOnly := []string{"binary", "avro"}
newStyleOnly := []string{"binary", "avro", "opentsdb"}
for name, creator := range parsers.Parsers {
if choice.Contains(name, newStyleOnly) {
t.Logf("skipping new-style-only %q...", name)