2022-03-11 06:09:58 +08:00
|
|
|
package influx_upstream
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
2023-02-22 19:08:46 +08:00
|
|
|
|
2022-03-11 06:09:58 +08:00
|
|
|
"github.com/influxdata/telegraf"
|
2023-07-14 01:09:12 +08:00
|
|
|
"github.com/influxdata/telegraf/config"
|
2022-03-11 06:09:58 +08:00
|
|
|
"github.com/influxdata/telegraf/metric"
|
2022-07-07 04:23:13 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/parsers"
|
2022-03-11 06:09:58 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
maxErrorBufferSize = 1024
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
ErrNoMetric = errors.New("no metric in line")
|
|
|
|
|
ErrEOF = errors.New("EOF")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type TimeFunc func() time.Time
|
|
|
|
|
|
|
|
|
|
// nthIndexAny finds the nth index of some unicode code point in a string or returns -1
|
|
|
|
|
func nthIndexAny(s, chars string, n int) int {
|
|
|
|
|
offset := 0
|
|
|
|
|
for found := 1; found <= n; found++ {
|
|
|
|
|
i := strings.IndexAny(s[offset:], chars)
|
|
|
|
|
if i < 0 {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
offset += i
|
|
|
|
|
if found == n {
|
|
|
|
|
return offset
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
offset += len(chars)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return -1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ParseError indicates a error in the parsing of the text.
|
|
|
|
|
type ParseError struct {
|
|
|
|
|
*lineprotocol.DecodeError
|
|
|
|
|
buf string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *ParseError) Error() string {
|
|
|
|
|
// When an error occurs within the stream decoder, we do not have access
|
|
|
|
|
// to the internal buffer, so we cannot display the contents of the invalid
|
|
|
|
|
// metric
|
|
|
|
|
if e.buf == "" {
|
|
|
|
|
return fmt.Sprintf("metric parse error: %s at %d:%d", e.Err, e.Line, e.Column)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lineStart := nthIndexAny(e.buf, "\n", int(e.Line-1)) + 1
|
|
|
|
|
buffer := e.buf[lineStart:]
|
|
|
|
|
eol := strings.IndexAny(buffer, "\n")
|
|
|
|
|
if eol >= 0 {
|
|
|
|
|
buffer = strings.TrimSuffix(buffer[:eol], "\r")
|
|
|
|
|
}
|
|
|
|
|
if len(buffer) > maxErrorBufferSize {
|
|
|
|
|
startEllipsis := true
|
|
|
|
|
offset := e.Column - 1 - lineStart
|
|
|
|
|
if offset > len(buffer) || offset < 0 {
|
|
|
|
|
offset = len(buffer)
|
|
|
|
|
}
|
|
|
|
|
start := offset - maxErrorBufferSize
|
|
|
|
|
if start < 0 {
|
|
|
|
|
startEllipsis = false
|
|
|
|
|
start = 0
|
|
|
|
|
}
|
|
|
|
|
// if we trimmed it the column won't line up. it'll always be the last character,
|
|
|
|
|
// because the parser doesn't continue past it, but point it out anyway so
|
|
|
|
|
// it's obvious where the issue is.
|
|
|
|
|
buffer = buffer[start:offset] + "<-- here"
|
|
|
|
|
if startEllipsis {
|
|
|
|
|
buffer = "..." + buffer
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.Err, e.Line, e.Column, buffer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// convertToParseError attempts to convert a lineprotocol.DecodeError to a ParseError
|
|
|
|
|
func convertToParseError(input []byte, rawErr error) error {
|
2023-02-22 19:08:46 +08:00
|
|
|
var decErr *lineprotocol.DecodeError
|
|
|
|
|
if !errors.As(rawErr, &decErr) {
|
2022-03-11 06:09:58 +08:00
|
|
|
return rawErr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &ParseError{
|
2023-02-22 19:08:46 +08:00
|
|
|
DecodeError: decErr,
|
2022-03-11 06:09:58 +08:00
|
|
|
buf: string(input),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parser is an InfluxDB Line Protocol parser that implements the
|
|
|
|
|
// parsers.Parser interface.
|
|
|
|
|
type Parser struct {
|
2023-07-14 01:09:12 +08:00
|
|
|
InfluxTimestampPrecsion config.Duration `toml:"influx_timestamp_precision"`
|
|
|
|
|
DefaultTags map[string]string `toml:"-"`
|
2022-07-07 04:23:13 +08:00
|
|
|
// If set to "series" a series machine will be initialized, defaults to regular machine
|
|
|
|
|
Type string `toml:"-"`
|
2022-03-11 06:09:58 +08:00
|
|
|
|
|
|
|
|
defaultTime TimeFunc
|
|
|
|
|
precision lineprotocol.Precision
|
|
|
|
|
allowPartial bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) SetTimeFunc(f TimeFunc) {
|
|
|
|
|
p.defaultTime = f
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
|
|
|
|
metrics := make([]telegraf.Metric, 0)
|
|
|
|
|
decoder := lineprotocol.NewDecoderWithBytes(input)
|
|
|
|
|
|
|
|
|
|
for decoder.Next() {
|
|
|
|
|
m, err := nextMetric(decoder, p.precision, p.defaultTime, p.allowPartial)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, convertToParseError(input, err)
|
|
|
|
|
}
|
|
|
|
|
metrics = append(metrics, m)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p.applyDefaultTags(metrics)
|
|
|
|
|
return metrics, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
|
|
|
|
metrics, err := p.Parse([]byte(line))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(metrics) < 1 {
|
|
|
|
|
return nil, ErrNoMetric
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return metrics[0], nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
|
|
|
|
p.DefaultTags = tags
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-14 01:09:12 +08:00
|
|
|
func (p *Parser) SetTimePrecision(u time.Duration) error {
|
2022-03-11 06:09:58 +08:00
|
|
|
switch u {
|
2023-07-14 01:09:12 +08:00
|
|
|
case 0:
|
|
|
|
|
p.precision = lineprotocol.Nanosecond
|
2022-03-11 06:09:58 +08:00
|
|
|
case time.Nanosecond:
|
|
|
|
|
p.precision = lineprotocol.Nanosecond
|
|
|
|
|
case time.Microsecond:
|
|
|
|
|
p.precision = lineprotocol.Microsecond
|
|
|
|
|
case time.Millisecond:
|
|
|
|
|
p.precision = lineprotocol.Millisecond
|
|
|
|
|
case time.Second:
|
|
|
|
|
p.precision = lineprotocol.Second
|
2023-07-14 01:09:12 +08:00
|
|
|
default:
|
|
|
|
|
return fmt.Errorf("invalid time precision: %d", u)
|
2022-03-11 06:09:58 +08:00
|
|
|
}
|
2023-07-14 01:09:12 +08:00
|
|
|
|
|
|
|
|
return nil
|
2022-03-11 06:09:58 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
|
|
|
|
|
if len(p.DefaultTags) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, m := range metrics {
|
|
|
|
|
p.applyDefaultTagsSingle(m)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) {
|
|
|
|
|
for k, v := range p.DefaultTags {
|
|
|
|
|
if !m.HasTag(k) {
|
|
|
|
|
m.AddTag(k, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-07 04:23:13 +08:00
|
|
|
func (p *Parser) Init() error {
|
2023-07-14 01:09:12 +08:00
|
|
|
if err := p.SetTimePrecision(time.Duration(p.InfluxTimestampPrecsion)); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-07 04:23:13 +08:00
|
|
|
p.defaultTime = time.Now
|
|
|
|
|
p.allowPartial = p.Type == "series"
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
parsers.Add("influx_upstream",
|
|
|
|
|
func(_ string) telegraf.Parser {
|
|
|
|
|
return &Parser{}
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-11 06:09:58 +08:00
|
|
|
// StreamParser is an InfluxDB Line Protocol parser. It is not safe for
|
|
|
|
|
// concurrent use in multiple goroutines.
|
|
|
|
|
type StreamParser struct {
|
|
|
|
|
decoder *lineprotocol.Decoder
|
|
|
|
|
defaultTime TimeFunc
|
|
|
|
|
precision lineprotocol.Precision
|
|
|
|
|
lastError error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewStreamParser(r io.Reader) *StreamParser {
|
|
|
|
|
return &StreamParser{
|
|
|
|
|
decoder: lineprotocol.NewDecoder(r),
|
|
|
|
|
defaultTime: time.Now,
|
|
|
|
|
precision: lineprotocol.Nanosecond,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SetTimeFunc changes the function used to determine the time of metrics
|
|
|
|
|
// without a timestamp. The default TimeFunc is time.Now. Useful mostly for
|
|
|
|
|
// testing, or perhaps if you want all metrics to have the same timestamp.
|
|
|
|
|
func (sp *StreamParser) SetTimeFunc(f TimeFunc) {
|
|
|
|
|
sp.defaultTime = f
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-03 21:23:06 +08:00
|
|
|
func (sp *StreamParser) SetTimePrecision(u time.Duration) error {
|
2022-03-11 06:09:58 +08:00
|
|
|
switch u {
|
|
|
|
|
case time.Nanosecond:
|
|
|
|
|
sp.precision = lineprotocol.Nanosecond
|
|
|
|
|
case time.Microsecond:
|
|
|
|
|
sp.precision = lineprotocol.Microsecond
|
|
|
|
|
case time.Millisecond:
|
|
|
|
|
sp.precision = lineprotocol.Millisecond
|
|
|
|
|
case time.Second:
|
|
|
|
|
sp.precision = lineprotocol.Second
|
2022-10-03 21:23:06 +08:00
|
|
|
case time.Minute:
|
2024-02-09 01:32:30 +08:00
|
|
|
return errors.New("time precision 'm' is not supported")
|
2022-10-03 21:23:06 +08:00
|
|
|
case time.Hour:
|
2024-02-09 01:32:30 +08:00
|
|
|
return errors.New("time precision 'h' is not supported")
|
2022-03-11 06:09:58 +08:00
|
|
|
}
|
2022-10-03 21:23:06 +08:00
|
|
|
|
|
|
|
|
return nil
|
2022-03-11 06:09:58 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Next parses the next item from the stream. You can repeat calls to this
|
|
|
|
|
// function if it returns ParseError to get the next metric or error.
|
|
|
|
|
func (sp *StreamParser) Next() (telegraf.Metric, error) {
|
|
|
|
|
if !sp.decoder.Next() {
|
2023-02-22 19:08:46 +08:00
|
|
|
if err := sp.decoder.Err(); err != nil && !errors.Is(err, sp.lastError) {
|
2022-03-11 06:09:58 +08:00
|
|
|
sp.lastError = err
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil, ErrEOF
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
m, err := nextMetric(sp.decoder, sp.precision, sp.defaultTime, false)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, convertToParseError([]byte{}, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return m, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func nextMetric(decoder *lineprotocol.Decoder, precision lineprotocol.Precision, defaultTime TimeFunc, allowPartial bool) (telegraf.Metric, error) {
|
|
|
|
|
measurement, err := decoder.Measurement()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
m := metric.New(string(measurement), nil, nil, time.Time{})
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
key, value, err := decoder.NextTag()
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Allow empty tags for series parser
|
|
|
|
|
if strings.Contains(err.Error(), "empty tag name") && allowPartial {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil, err
|
|
|
|
|
} else if key == nil {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
m.AddTag(string(key), string(value))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
key, value, err := decoder.NextField()
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Allow empty fields for series parser
|
|
|
|
|
if strings.Contains(err.Error(), "expected field key") && allowPartial {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil, err
|
|
|
|
|
} else if key == nil {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
m.AddField(string(key), value.Interface())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t, err := decoder.Time(precision, defaultTime())
|
|
|
|
|
if err != nil && !allowPartial {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
m.SetTime(t)
|
|
|
|
|
|
|
|
|
|
return m, nil
|
|
|
|
|
}
|