Using mime-type in prometheus parser to handle protocol-buffer responses (#8545)
This commit is contained in:
parent
9ee6e034fb
commit
4b7d11385c
|
|
@ -330,7 +330,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
|
|||
}
|
||||
|
||||
if p.MetricVersion == 2 {
|
||||
parser := parser_v2.Parser{}
|
||||
parser := parser_v2.Parser{Header: resp.Header}
|
||||
metrics, err = parser.Parse(body)
|
||||
} else {
|
||||
metrics, err = Parse(body, resp.Header)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,11 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
"io"
|
||||
"math"
|
||||
"mime"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
|
|
@ -17,6 +21,7 @@ import (
|
|||
|
||||
type Parser struct {
|
||||
DefaultTags map[string]string
|
||||
Header http.Header
|
||||
}
|
||||
|
||||
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
|
|
@ -31,9 +36,25 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
|
||||
// Prepare output
|
||||
metricFamilies := make(map[string]*dto.MetricFamily)
|
||||
metricFamilies, err = parser.TextToMetricFamilies(reader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading text format failed: %s", err)
|
||||
mediatype, params, err := mime.ParseMediaType(p.Header.Get("Content-Type"))
|
||||
if err == nil && mediatype == "application/vnd.google.protobuf" &&
|
||||
params["encoding"] == "delimited" &&
|
||||
params["proto"] == "io.prometheus.client.MetricFamily" {
|
||||
for {
|
||||
mf := &dto.MetricFamily{}
|
||||
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
|
||||
if ierr == io.EOF {
|
||||
break
|
||||
}
|
||||
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
|
||||
}
|
||||
metricFamilies[mf.GetName()] = mf
|
||||
}
|
||||
} else {
|
||||
metricFamilies, err = parser.TextToMetricFamilies(reader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading text format failed: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
|
|
|||
|
|
@ -2,6 +2,9 @@ package prometheus
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -344,3 +347,105 @@ func parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
parser := Parser{}
|
||||
return parser.Parse(buf)
|
||||
}
|
||||
|
||||
func TestParserProtobufHeader(t *testing.T) {
|
||||
var uClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
},
|
||||
}
|
||||
expected := []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{
|
||||
"host": "omsk",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"swap_free": 9.77911808e+08,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
2,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{
|
||||
"host": "omsk",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"swap_in": 2.031616e+06,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
1,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{
|
||||
"host": "omsk",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"swap_out": 1.579008e+07,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
1,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{
|
||||
"host": "omsk",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"swap_total": 9.93185792e+08,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
2,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{
|
||||
"host": "omsk",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"swap_used": 1.5273984e+07,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
2,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{
|
||||
"host": "omsk",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"swap_used_percent": 1.5378778193395661,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
2,
|
||||
),
|
||||
}
|
||||
sampleProtoBufData := []uint8{67, 10, 9, 115, 119, 97, 112, 95, 102, 114, 101, 101, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 1, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 18, 9, 9, 0, 0, 0, 0, 224, 36, 205, 65, 65, 10, 7, 115, 119, 97, 112, 95, 105, 110, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 0, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 26, 9, 9, 0, 0, 0, 0, 0, 0, 63, 65, 66, 10, 8, 115, 119, 97, 112, 95, 111, 117, 116, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 0, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 26, 9, 9, 0, 0, 0, 0, 0, 30, 110, 65, 68, 10, 10, 115, 119, 97, 112, 95, 116, 111, 116, 97, 108, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 1, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 18, 9, 9, 0, 0, 0, 0, 104, 153, 205, 65, 67, 10, 9, 115, 119, 97, 112, 95, 117, 115, 101, 100, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 1, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 18, 9, 9, 0, 0, 0, 0, 0, 34, 109, 65, 75, 10, 17, 115, 119, 97, 112, 95, 117, 115, 101, 100, 95, 112, 101, 114, 99, 101, 110, 116, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 1, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 18, 9, 9, 109, 234, 180, 197, 37, 155, 248, 63}
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited")
|
||||
w.Write(sampleProtoBufData)
|
||||
}))
|
||||
defer ts.Close()
|
||||
req, err := http.NewRequest("GET", ts.URL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create new request '%s': %s", ts.URL, err)
|
||||
}
|
||||
var resp *http.Response
|
||||
resp, err = uClient.Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("error making HTTP request to %s: %s", ts.URL, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("error reading body: %s", err)
|
||||
}
|
||||
parser := Parser{Header: resp.Header}
|
||||
metrics, err := parser.Parse(body)
|
||||
if err != nil {
|
||||
t.Fatalf("error reading metrics for %s: %s", ts.URL, err)
|
||||
}
|
||||
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics())
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue