New prometheus remote write parser (#8967)
This commit is contained in:
parent
cc6c51cf16
commit
67f588cbce
|
|
@ -15,6 +15,7 @@ Protocol or in JSON format.
|
|||
- [Logfmt](/plugins/parsers/logfmt)
|
||||
- [Nagios](/plugins/parsers/nagios)
|
||||
- [Prometheus](/plugins/parsers/prometheus)
|
||||
- [PrometheusRemoteWrite](/plugins/parsers/prometheusremotewrite)
|
||||
- [Value](/plugins/parsers/value), ie: 45 or "booyah"
|
||||
- [Wavefront](/plugins/parsers/wavefront)
|
||||
- [XML](/plugins/parsers/xml)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,44 @@
|
|||
# Prometheus remote write
|
||||
|
||||
Converts prometheus remote write samples directly into Telegraf metrics. It can be used with [http_listener_v2](/plugins/inputs/http_listener_v2). There are no additional configuration options for Prometheus Remote Write Samples.
|
||||
|
||||
### Configuration
|
||||
|
||||
```toml
|
||||
[[inputs.http_listener_v2]]
|
||||
## Address and port to host HTTP listener on
|
||||
service_address = ":1234"
|
||||
|
||||
## Path to listen to.
|
||||
path = "/recieve"
|
||||
|
||||
## Data format to consume.
|
||||
data_format = "prometheusremotewrite"
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
**Example Input**
|
||||
```
|
||||
prompb.WriteRequest{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "localhost:9090"},
|
||||
{Name: "job", Value: "prometheus"},
|
||||
{Name: "quantile", Value: "0.99"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 4.63, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
**Example Output**
|
||||
```
|
||||
prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_gc_duration_seconds=4.63 1614889298859000000
|
||||
```
|
||||
|
|
@ -0,0 +1,88 @@
|
|||
package prometheusremotewrite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
type Parser struct {
|
||||
DefaultTags map[string]string
|
||||
}
|
||||
|
||||
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
var err error
|
||||
var metrics []telegraf.Metric
|
||||
var req prompb.WriteRequest
|
||||
|
||||
if err := proto.Unmarshal(buf, &req); err != nil {
|
||||
return nil, fmt.Errorf("unable to unmarshal request body: %s", err)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
for _, ts := range req.Timeseries {
|
||||
tags := map[string]string{}
|
||||
for key, value := range p.DefaultTags {
|
||||
tags[key] = value
|
||||
}
|
||||
|
||||
for _, l := range ts.Labels {
|
||||
tags[l.Name] = l.Value
|
||||
}
|
||||
|
||||
metricName := tags[model.MetricNameLabel]
|
||||
if metricName == "" {
|
||||
return nil, fmt.Errorf("metric name %q not found in tag-set or empty", model.MetricNameLabel)
|
||||
}
|
||||
delete(tags, model.MetricNameLabel)
|
||||
|
||||
for _, s := range ts.Samples {
|
||||
fields := make(map[string]interface{})
|
||||
if !math.IsNaN(s.Value) {
|
||||
fields[metricName] = s.Value
|
||||
}
|
||||
// converting to telegraf metric
|
||||
if len(fields) > 0 {
|
||||
t := now
|
||||
if s.Timestamp > 0 {
|
||||
t = time.Unix(0, s.Timestamp*1000000)
|
||||
}
|
||||
m, err := metric.New("prometheus_remote_write", tags, fields, t)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to convert to telegraf metric: %s", err)
|
||||
}
|
||||
metrics = append(metrics, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
return metrics, err
|
||||
}
|
||||
|
||||
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
metrics, err := p.Parse([]byte(line))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return nil, fmt.Errorf("No metrics in line")
|
||||
}
|
||||
|
||||
if len(metrics) > 1 {
|
||||
return nil, fmt.Errorf("More than one metric in line")
|
||||
}
|
||||
|
||||
return metrics[0], nil
|
||||
}
|
||||
|
||||
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
||||
p.DefaultTags = tags
|
||||
}
|
||||
|
|
@ -0,0 +1,157 @@
|
|||
package prometheusremotewrite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestParse(t *testing.T) {
|
||||
prompbInput := prompb.WriteRequest{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "quantile", Value: "0.99"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 4.63, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "prometheus_target_interval_length_seconds"},
|
||||
{Name: "job", Value: "prometheus"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 14.99, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
inoutBytes, err := prompbInput.Marshal()
|
||||
assert.NoError(t, err)
|
||||
|
||||
expected := []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"prometheus_remote_write",
|
||||
map[string]string{
|
||||
"quantile": "0.99",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"go_gc_duration_seconds": float64(4.63),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus_remote_write",
|
||||
map[string]string{
|
||||
"job": "prometheus",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"prometheus_target_interval_length_seconds": float64(14.99),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
parser := Parser{
|
||||
DefaultTags: map[string]string{},
|
||||
}
|
||||
|
||||
metrics, err := parser.Parse(inoutBytes)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 2)
|
||||
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics())
|
||||
}
|
||||
|
||||
func TestDefaultTags(t *testing.T) {
|
||||
prompbInput := prompb.WriteRequest{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "foo"},
|
||||
{Name: "__eg__", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
inoutBytes, err := prompbInput.Marshal()
|
||||
assert.NoError(t, err)
|
||||
|
||||
expected := []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"prometheus_remote_write",
|
||||
map[string]string{
|
||||
"defaultTag": "defaultTagValue",
|
||||
"__eg__": "bar",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"foo": float64(1),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
parser := Parser{
|
||||
DefaultTags: map[string]string{
|
||||
"defaultTag": "defaultTagValue",
|
||||
},
|
||||
}
|
||||
|
||||
metrics, err := parser.Parse(inoutBytes)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics())
|
||||
}
|
||||
|
||||
func TestMetricsWithTimestamp(t *testing.T) {
|
||||
testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC)
|
||||
testTimeUnix := testTime.UnixNano() / int64(time.Millisecond)
|
||||
prompbInput := prompb.WriteRequest{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "foo"},
|
||||
{Name: "__eg__", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: testTimeUnix},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
inoutBytes, err := prompbInput.Marshal()
|
||||
assert.NoError(t, err)
|
||||
|
||||
expected := []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"prometheus_remote_write",
|
||||
map[string]string{
|
||||
"__eg__": "bar",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"foo": float64(1),
|
||||
},
|
||||
testTime,
|
||||
),
|
||||
}
|
||||
parser := Parser{
|
||||
DefaultTags: map[string]string{},
|
||||
}
|
||||
|
||||
metrics, err := parser.Parse(inoutBytes)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
testutil.RequireMetricsEqual(t, expected, metrics, testutil.SortMetrics())
|
||||
}
|
||||
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/nagios"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/value"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/xml"
|
||||
|
|
@ -248,6 +249,8 @@ func NewParser(config *Config) (Parser, error) {
|
|||
)
|
||||
case "prometheus":
|
||||
parser, err = NewPrometheusParser(config.DefaultTags)
|
||||
case "prometheusremotewrite":
|
||||
parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags)
|
||||
case "xml":
|
||||
parser, err = NewXMLParser(config.MetricName, config.DefaultTags, config.XMLConfig)
|
||||
default:
|
||||
|
|
@ -361,6 +364,12 @@ func NewPrometheusParser(defaultTags map[string]string) (Parser, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, error) {
|
||||
return &prometheusremotewrite.Parser{
|
||||
DefaultTags: defaultTags,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewXMLParser(metricName string, defaultTags map[string]string, xmlConfigs []XMLConfig) (Parser, error) {
|
||||
// Convert the config formats which is a one-to-one copy
|
||||
configs := make([]xml.Config, len(xmlConfigs))
|
||||
|
|
|
|||
|
|
@ -3,13 +3,14 @@ package prometheusremotewrite
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
|
|
|||
Loading…
Reference in New Issue