Don't stop parsing after statsd parsing error (#9423)
This commit is contained in:
parent
f9fc64efd6
commit
c6c3efdb97
|
|
@ -3,7 +3,6 @@ package statsd
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
@ -18,6 +17,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
||||||
"github.com/influxdata/telegraf/selfstat"
|
"github.com/influxdata/telegraf/selfstat"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -35,6 +35,8 @@ const (
|
||||||
parserGoRoutines = 5
|
parserGoRoutines = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errParsing = errors.New("error parsing statsd line")
|
||||||
|
|
||||||
// Statsd allows the importing of statsd and dogstatsd data.
|
// Statsd allows the importing of statsd and dogstatsd data.
|
||||||
type Statsd struct {
|
type Statsd struct {
|
||||||
// Protocol used on listener - udp or tcp
|
// Protocol used on listener - udp or tcp
|
||||||
|
|
@ -568,6 +570,10 @@ func (s *Statsd) parser() error {
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
if err := s.parseStatsdLine(line); err != nil {
|
if err := s.parseStatsdLine(line); err != nil {
|
||||||
|
if errors.Cause(err) == errParsing {
|
||||||
|
// parsing errors log when the error occurs
|
||||||
|
continue
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -605,7 +611,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
bits := strings.Split(line, ":")
|
bits := strings.Split(line, ":")
|
||||||
if len(bits) < 2 {
|
if len(bits) < 2 {
|
||||||
s.Log.Errorf("Splitting ':', unable to parse metric: %s", line)
|
s.Log.Errorf("Splitting ':', unable to parse metric: %s", line)
|
||||||
return errors.New("error Parsing statsd line")
|
return errParsing
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract bucket name from individual metric bits
|
// Extract bucket name from individual metric bits
|
||||||
|
|
@ -621,7 +627,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
pipesplit := strings.Split(bit, "|")
|
pipesplit := strings.Split(bit, "|")
|
||||||
if len(pipesplit) < 2 {
|
if len(pipesplit) < 2 {
|
||||||
s.Log.Errorf("Splitting '|', unable to parse metric: %s", line)
|
s.Log.Errorf("Splitting '|', unable to parse metric: %s", line)
|
||||||
return errors.New("error parsing statsd line")
|
return errParsing
|
||||||
} else if len(pipesplit) > 2 {
|
} else if len(pipesplit) > 2 {
|
||||||
sr := pipesplit[2]
|
sr := pipesplit[2]
|
||||||
|
|
||||||
|
|
@ -645,14 +651,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
m.mtype = pipesplit[1]
|
m.mtype = pipesplit[1]
|
||||||
default:
|
default:
|
||||||
s.Log.Errorf("Metric type %q unsupported", pipesplit[1])
|
s.Log.Errorf("Metric type %q unsupported", pipesplit[1])
|
||||||
return errors.New("error parsing statsd line")
|
return errParsing
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse the value
|
// Parse the value
|
||||||
if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") {
|
if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") {
|
||||||
if m.mtype != "g" && m.mtype != "c" {
|
if m.mtype != "g" && m.mtype != "c" {
|
||||||
s.Log.Errorf("+- values are only supported for gauges & counters, unable to parse metric: %s", line)
|
s.Log.Errorf("+- values are only supported for gauges & counters, unable to parse metric: %s", line)
|
||||||
return errors.New("error parsing statsd line")
|
return errParsing
|
||||||
}
|
}
|
||||||
m.additive = true
|
m.additive = true
|
||||||
}
|
}
|
||||||
|
|
@ -662,7 +668,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
v, err := strconv.ParseFloat(pipesplit[0], 64)
|
v, err := strconv.ParseFloat(pipesplit[0], 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Log.Errorf("Parsing value to float64, unable to parse metric: %s", line)
|
s.Log.Errorf("Parsing value to float64, unable to parse metric: %s", line)
|
||||||
return errors.New("error parsing statsd line")
|
return errParsing
|
||||||
}
|
}
|
||||||
m.floatvalue = v
|
m.floatvalue = v
|
||||||
case "c":
|
case "c":
|
||||||
|
|
@ -672,7 +678,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
v2, err2 := strconv.ParseFloat(pipesplit[0], 64)
|
v2, err2 := strconv.ParseFloat(pipesplit[0], 64)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
s.Log.Errorf("Parsing value to int64, unable to parse metric: %s", line)
|
s.Log.Errorf("Parsing value to int64, unable to parse metric: %s", line)
|
||||||
return errors.New("error parsing statsd line")
|
return errParsing
|
||||||
}
|
}
|
||||||
v = int64(v2)
|
v = int64(v2)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue