Semigroupoid multiline (#8167) (#8190)

Co-authored-by: javicrespo <javiercrespoalvez@gmail.com>
Co-authored-by: jcrespo <javier.crespo@ingenico.com>
Co-authored-by: semigroupoid <semigroupoid@users.noreply.github.com>
This commit is contained in:
Steven Soroka 2020-09-28 18:06:00 -04:00 committed by GitHub
parent b4fb1adc6f
commit 382dac70c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 687 additions and 11 deletions

View File

@ -62,6 +62,23 @@ The plugin expects messages in one of the
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## multiline parser/codec
## https://www.elastic.co/guide/en/logstash/2.4/plugins-filters-multiline.html
#[inputs.tail.multiline]
## The pattern should be a regexp which matches what you believe to be an indicator that the field is part of an event consisting of multiple lines of log data.
#pattern = "^\s"
## The field's value must be previous or next and indicates the relation to the
## multi-line event.
#match_which_line = "previous"
## The invert_match can be true or false (defaults to false).
## If true, a message not matching the pattern will constitute a match of the multiline filter and the what will be applied. (vice-versa is also true)
#invert_match = false
#After the specified timeout, this plugin sends the multiline event even if no new pattern is found to start a new event. The default is 5s.
#timeout = 5s
```
### Metrics

View File

@ -0,0 +1,135 @@
package tail
import (
"bytes"
"fmt"
"regexp"
"strings"
"time"
"github.com/influxdata/telegraf/internal"
)
// Indicates relation to the multiline event: previous or next
type MultilineMatchWhichLine int
type Multiline struct {
config *MultilineConfig
enabled bool
patternRegexp *regexp.Regexp
}
type MultilineConfig struct {
Pattern string
MatchWhichLine MultilineMatchWhichLine `toml:"match_which_line"`
InvertMatch bool
Timeout *internal.Duration
}
const (
// Previous => Append current line to previous line
Previous MultilineMatchWhichLine = iota
// Next => Next line will be appended to current line
Next
)
func (m *MultilineConfig) NewMultiline() (*Multiline, error) {
enabled := false
var r *regexp.Regexp
var err error
if m.Pattern != "" {
enabled = true
if r, err = regexp.Compile(m.Pattern); err != nil {
return nil, err
}
if m.Timeout == nil || m.Timeout.Duration.Nanoseconds() == int64(0) {
m.Timeout = &internal.Duration{Duration: 5 * time.Second}
}
}
return &Multiline{
config: m,
enabled: enabled,
patternRegexp: r}, nil
}
func (m *Multiline) IsEnabled() bool {
return m.enabled
}
func (m *Multiline) ProcessLine(text string, buffer *bytes.Buffer) string {
if m.matchString(text) {
buffer.WriteString(text)
return ""
}
if m.config.MatchWhichLine == Previous {
previousText := buffer.String()
buffer.Reset()
buffer.WriteString(text)
text = previousText
} else {
// Next
if buffer.Len() > 0 {
buffer.WriteString(text)
text = buffer.String()
buffer.Reset()
}
}
return text
}
func (m *Multiline) Flush(buffer *bytes.Buffer) string {
if buffer.Len() == 0 {
return ""
}
text := buffer.String()
buffer.Reset()
return text
}
func (m *Multiline) matchString(text string) bool {
return m.patternRegexp.MatchString(text) != m.config.InvertMatch
}
func (w MultilineMatchWhichLine) String() string {
switch w {
case Previous:
return "previous"
case Next:
return "next"
}
return ""
}
// UnmarshalTOML implements ability to unmarshal MultilineMatchWhichLine from TOML files.
func (w *MultilineMatchWhichLine) UnmarshalTOML(data []byte) (err error) {
return w.UnmarshalText(data)
}
// UnmarshalText implements encoding.TextUnmarshaler
func (w *MultilineMatchWhichLine) UnmarshalText(data []byte) (err error) {
s := string(data)
switch strings.ToUpper(s) {
case `PREVIOUS`, `"PREVIOUS"`, `'PREVIOUS'`:
*w = Previous
return
case `NEXT`, `"NEXT"`, `'NEXT'`:
*w = Next
return
}
*w = -1
return fmt.Errorf("E! [inputs.tail] unknown multiline MatchWhichLine")
}
// MarshalText implements encoding.TextMarshaler
func (w MultilineMatchWhichLine) MarshalText() ([]byte, error) {
s := w.String()
if s != "" {
return []byte(s), nil
}
return nil, fmt.Errorf("E! [inputs.tail] unknown multiline MatchWhichLine")
}

View File

@ -0,0 +1,235 @@
package tail
import (
"bytes"
"testing"
"time"
"github.com/influxdata/telegraf/internal"
"github.com/stretchr/testify/assert"
)
func TestMultilineConfigOK(t *testing.T) {
c := &MultilineConfig{
Pattern: ".*",
MatchWhichLine: Previous,
}
_, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
}
func TestMultilineConfigError(t *testing.T) {
c := &MultilineConfig{
Pattern: "\xA0",
MatchWhichLine: Previous,
}
_, err := c.NewMultiline()
assert.Error(t, err, "The pattern was invalid")
}
func TestMultilineConfigTimeoutSpecified(t *testing.T) {
duration, _ := time.ParseDuration("10s")
c := &MultilineConfig{
Pattern: ".*",
MatchWhichLine: Previous,
Timeout: &internal.Duration{Duration: duration},
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
assert.Equal(t, duration, m.config.Timeout.Duration)
}
func TestMultilineConfigDefaultTimeout(t *testing.T) {
duration, _ := time.ParseDuration("5s")
c := &MultilineConfig{
Pattern: ".*",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
assert.Equal(t, duration, m.config.Timeout.Duration)
}
func TestMultilineIsEnabled(t *testing.T) {
c := &MultilineConfig{
Pattern: ".*",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
isEnabled := m.IsEnabled()
assert.True(t, isEnabled, "Should have been enabled")
}
func TestMultilineIsDisabled(t *testing.T) {
c := &MultilineConfig{
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
isEnabled := m.IsEnabled()
assert.False(t, isEnabled, "Should have been disabled")
}
func TestMultilineFlushEmpty(t *testing.T) {
c := &MultilineConfig{
Pattern: "^=>",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
var buffer bytes.Buffer
text := m.Flush(&buffer)
assert.Empty(t, text)
}
func TestMultilineFlush(t *testing.T) {
c := &MultilineConfig{
Pattern: "^=>",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
var buffer bytes.Buffer
buffer.WriteString("foo")
text := m.Flush(&buffer)
assert.Equal(t, "foo", text)
assert.Zero(t, buffer.Len())
}
func TestMultiLineProcessLinePrevious(t *testing.T) {
c := &MultilineConfig{
Pattern: "^=>",
MatchWhichLine: Previous,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
var buffer bytes.Buffer
text := m.ProcessLine("1", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())
text = m.ProcessLine("=>2", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())
text = m.ProcessLine("=>3", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())
text = m.ProcessLine("4", &buffer)
assert.Equal(t, "1=>2=>3", text)
assert.NotZero(t, buffer.Len())
text = m.ProcessLine("5", &buffer)
assert.Equal(t, "4", text)
assert.Equal(t, "5", buffer.String())
}
func TestMultiLineProcessLineNext(t *testing.T) {
c := &MultilineConfig{
Pattern: "=>$",
MatchWhichLine: Next,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
var buffer bytes.Buffer
text := m.ProcessLine("1=>", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())
text = m.ProcessLine("2=>", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())
text = m.ProcessLine("3=>", &buffer)
assert.Empty(t, text)
assert.NotZero(t, buffer.Len())
text = m.ProcessLine("4", &buffer)
assert.Equal(t, "1=>2=>3=>4", text)
assert.Zero(t, buffer.Len())
text = m.ProcessLine("5", &buffer)
assert.Equal(t, "5", text)
assert.Zero(t, buffer.Len())
}
func TestMultiLineMatchStringWithInvertMatchFalse(t *testing.T) {
c := &MultilineConfig{
Pattern: "=>$",
MatchWhichLine: Next,
InvertMatch: false,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
matches1 := m.matchString("t=>")
matches2 := m.matchString("t")
assert.True(t, matches1)
assert.False(t, matches2)
}
func TestMultiLineMatchStringWithInvertTrue(t *testing.T) {
c := &MultilineConfig{
Pattern: "=>$",
MatchWhichLine: Next,
InvertMatch: true,
}
m, err := c.NewMultiline()
assert.NoError(t, err, "Configuration was OK.")
matches1 := m.matchString("t=>")
matches2 := m.matchString("t")
assert.False(t, matches1)
assert.True(t, matches2)
}
func TestMultilineWhat(t *testing.T) {
var w1 MultilineMatchWhichLine
w1.UnmarshalTOML([]byte(`"previous"`))
assert.Equal(t, Previous, w1)
var w2 MultilineMatchWhichLine
w2.UnmarshalTOML([]byte(`previous`))
assert.Equal(t, Previous, w2)
var w3 MultilineMatchWhichLine
w3.UnmarshalTOML([]byte(`'previous'`))
assert.Equal(t, Previous, w3)
var w4 MultilineMatchWhichLine
w4.UnmarshalTOML([]byte(`"next"`))
assert.Equal(t, Next, w4)
var w5 MultilineMatchWhichLine
w5.UnmarshalTOML([]byte(`next`))
assert.Equal(t, Next, w5)
var w6 MultilineMatchWhichLine
w6.UnmarshalTOML([]byte(`'next'`))
assert.Equal(t, Next, w6)
var w7 MultilineMatchWhichLine
err := w7.UnmarshalTOML([]byte(`nope`))
assert.Equal(t, MultilineMatchWhichLine(-1), w7)
assert.Error(t, err)
}

View File

@ -3,11 +3,13 @@
package tail
import (
"bytes"
"context"
"errors"
"io"
"strings"
"sync"
"time"
"github.com/dimchansky/utfbom"
"github.com/influxdata/tail"
@ -45,11 +47,16 @@ type Tail struct {
offsets map[string]int64
parserFunc parsers.ParserFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem semaphore
decoder *encoding.Decoder
acc telegraf.TrackingAccumulator
MultilineConfig MultilineConfig `toml:"multiline"`
multiline *Multiline
ctx context.Context
cancel context.CancelFunc
sem semaphore
decoder *encoding.Decoder
}
func NewTail() *Tail {
@ -107,6 +114,27 @@ const sampleConfig = `
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## multiline parser/codec
## https://www.elastic.co/guide/en/logstash/2.4/plugins-filters-multiline.html
#[inputs.tail.multiline]
## The pattern should be a regexp which matches what you believe to be an
## indicator that the field is part of an event consisting of multiple lines of log data.
#pattern = "^\s"
## This field must be either "previous" or "next".
## If a line matches the pattern, "previous" indicates that it belongs to the previous line,
## whereas "next" indicates that the line belongs to the next one.
#match_which_line = "previous"
## The invert_match field can be true or false (defaults to false).
## If true, a message not matching the pattern will constitute a match of the multiline
## filter and the what will be applied. (vice-versa is also true)
#invert_match = false
## After the specified timeout, this plugin sends a multiline event even if no new pattern
## is found to start a new event. The default timeout is 5s.
#timeout = 5s
`
func (t *Tail) SampleConfig() string {
@ -150,9 +178,16 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
}
}()
var err error
t.multiline, err = t.MultilineConfig.NewMultiline()
if err != nil {
return err
}
t.tailers = make(map[string]*tail.Tail)
err := t.tailNewFiles(t.FromBeginning)
err = t.tailNewFiles(t.FromBeginning)
// clear offsets
t.offsets = make(map[string]int64)
@ -212,6 +247,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
return r
},
})
if err != nil {
t.Log.Debugf("Failed to open file (%s): %v", file, err)
continue
@ -227,6 +263,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
// create a goroutine for each "tailer"
t.wg.Add(1)
go func() {
defer t.wg.Done()
t.receiver(parser, tailer)
@ -237,6 +274,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error())
}
}()
t.tailers[tailer.Filename] = tailer
}
}
@ -272,18 +310,72 @@ func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.M
// for changes, parse any incoming msgs, and add to the accumulator.
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
var firstLine = true
for line := range tailer.Lines {
if line.Err != nil {
// holds the individual lines of multi-line log entries.
var buffer bytes.Buffer
var timer *time.Timer
var timeout <-chan time.Time
// The multiline mode requires a timer in order to flush the multiline buffer
// if no new lines are incoming.
if t.multiline.IsEnabled() {
timer = time.NewTimer(t.MultilineConfig.Timeout.Duration)
timeout = timer.C
}
channelOpen := true
tailerOpen := true
var line *tail.Line
for {
line = nil
if timer != nil {
timer.Reset(t.MultilineConfig.Timeout.Duration)
}
select {
case <-t.ctx.Done():
channelOpen = false
case line, tailerOpen = <-tailer.Lines:
if !tailerOpen {
channelOpen = false
}
case <-timeout:
}
var text string
if line != nil {
// Fix up files with Windows line endings.
text = strings.TrimRight(line.Text, "\r")
if t.multiline.IsEnabled() {
if text = t.multiline.ProcessLine(text, &buffer); text == "" {
continue
}
}
}
if line == nil || !channelOpen || !tailerOpen {
if text += t.multiline.Flush(&buffer); text == "" {
if !channelOpen {
return
}
continue
}
}
if line != nil && line.Err != nil {
t.Log.Errorf("Tailing %q: %s", tailer.Filename, line.Err.Error())
continue
}
// Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")
metrics, err := parseLine(parser, text, firstLine)
if err != nil {
t.Log.Errorf("Malformed log line in %q: [%q]: %s",
tailer.Filename, line.Text, err.Error())
tailer.Filename, text, err.Error())
continue
}
firstLine = false
@ -292,6 +384,18 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
metric.AddTag("path", tailer.Filename)
}
// try writing out metric first without blocking
select {
case t.sem <- empty{}:
t.acc.AddTrackingMetricGroup(metrics)
if t.ctx.Err() != nil {
return // exit!
}
continue // next loop
default:
// no room. switch to blocking write.
}
// Block until plugin is stopping or room is available to add metrics.
select {
case <-t.ctx.Done():

View File

@ -5,10 +5,13 @@ import (
"io/ioutil"
"log"
"os"
"runtime"
"strings"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/influx"
@ -88,6 +91,173 @@ func TestTailDosLineendings(t *testing.T) {
})
}
func TestGrokParseLogFilesWithMultiline(t *testing.T) {
thisdir := getCurrentDir()
//we make sure the timeout won't kick in
duration, _ := time.ParseDuration("100s")
tt := NewTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{thisdir + "testdata/test_multiline.log"}
tt.MultilineConfig = MultilineConfig{
Pattern: `^[^\[]`,
MatchWhichLine: Previous,
InvertMatch: false,
Timeout: &internal.Duration{Duration: duration},
}
tt.SetParserFunc(createGrokParser)
err := tt.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
assert.NoError(t, tt.Start(&acc))
defer tt.Stop()
acc.Wait(3)
expectedPath := thisdir + "testdata/test_multiline.log"
acc.AssertContainsTaggedFields(t, "tail_grok",
map[string]interface{}{
"message": "HelloExample: This is debug",
},
map[string]string{
"path": expectedPath,
"loglevel": "DEBUG",
})
acc.AssertContainsTaggedFields(t, "tail_grok",
map[string]interface{}{
"message": "HelloExample: This is info",
},
map[string]string{
"path": expectedPath,
"loglevel": "INFO",
})
acc.AssertContainsTaggedFields(t, "tail_grok",
map[string]interface{}{
"message": "HelloExample: Sorry, something wrong! java.lang.ArithmeticException: / by zero\tat com.foo.HelloExample2.divide(HelloExample2.java:24)\tat com.foo.HelloExample2.main(HelloExample2.java:14)",
},
map[string]string{
"path": expectedPath,
"loglevel": "ERROR",
})
assert.Equal(t, uint64(3), acc.NMetrics())
}
func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
// This seems neccessary in order to get the test to read the following lines.
_, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is fluff\r\n")
require.NoError(t, err)
require.NoError(t, tmpfile.Sync())
// set tight timeout for tests
duration := 10 * time.Millisecond
tt := NewTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.MultilineConfig = MultilineConfig{
Pattern: `^[^\[]`,
MatchWhichLine: Previous,
InvertMatch: false,
Timeout: &internal.Duration{Duration: duration},
}
tt.SetParserFunc(createGrokParser)
err = tt.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
assert.NoError(t, tt.Start(&acc))
time.Sleep(11 * time.Millisecond) // will force timeout
_, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is info\r\n")
require.NoError(t, err)
require.NoError(t, tmpfile.Sync())
acc.Wait(2)
time.Sleep(11 * time.Millisecond) // will force timeout
_, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] WARN HelloExample: This is warn\r\n")
require.NoError(t, err)
require.NoError(t, tmpfile.Sync())
acc.Wait(3)
tt.Stop()
assert.Equal(t, uint64(3), acc.NMetrics())
expectedPath := tmpfile.Name()
acc.AssertContainsTaggedFields(t, "tail_grok",
map[string]interface{}{
"message": "HelloExample: This is info",
},
map[string]string{
"path": expectedPath,
"loglevel": "INFO",
})
acc.AssertContainsTaggedFields(t, "tail_grok",
map[string]interface{}{
"message": "HelloExample: This is warn",
},
map[string]string{
"path": expectedPath,
"loglevel": "WARN",
})
}
func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *testing.T) {
thisdir := getCurrentDir()
//we make sure the timeout won't kick in
duration := 100 * time.Second
tt := NewTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{thisdir + "testdata/test_multiline.log"}
tt.MultilineConfig = MultilineConfig{
Pattern: `^[^\[]`,
MatchWhichLine: Previous,
InvertMatch: false,
Timeout: &internal.Duration{Duration: duration},
}
tt.SetParserFunc(createGrokParser)
err := tt.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
assert.NoError(t, tt.Start(&acc))
acc.Wait(3)
assert.Equal(t, uint64(3), acc.NMetrics())
// Close tailer, so multiline buffer is flushed
tt.Stop()
acc.Wait(4)
expectedPath := thisdir + "testdata/test_multiline.log"
acc.AssertContainsTaggedFields(t, "tail_grok",
map[string]interface{}{
"message": "HelloExample: This is warn",
},
map[string]string{
"path": expectedPath,
"loglevel": "WARN",
})
}
func createGrokParser() (parsers.Parser, error) {
grokConfig := &parsers.Config{
MetricName: "tail_grok",
GrokPatterns: []string{"%{TEST_LOG_MULTILINE}"},
GrokCustomPatternFiles: []string{getCurrentDir() + "testdata/test-patterns"},
DataFormat: "grok",
}
parser, err := parsers.NewParser(grokConfig)
return parser, err
}
// The csv parser should only parse the header line once per file.
func TestCSVHeadersParsedOnce(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
@ -204,6 +374,11 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) {
testutil.IgnoreTime())
}
func getCurrentDir() string {
_, filename, _, _ := runtime.Caller(1)
return strings.Replace(filename, "tail_test.go", "", 1)
}
func TestCharacterEncoding(t *testing.T) {
full := []telegraf.Metric{
testutil.MustMetric("cpu",

View File

@ -0,0 +1,3 @@
# Test multiline
# [04/Jun/2016:12:41:45 +0100] DEBUG HelloExample: This is debug
TEST_LOG_MULTILINE \[%{HTTPDATE:timestamp:ts-httpd}\] %{WORD:loglevel:tag} %{GREEDYDATA:message}

View File

@ -0,0 +1,7 @@
[04/Jun/2016:12:41:45 +0100] DEBUG HelloExample: This is debug
[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is info
[04/Jun/2016:12:41:46 +0100] ERROR HelloExample: Sorry, something wrong!
java.lang.ArithmeticException: / by zero
at com.foo.HelloExample2.divide(HelloExample2.java:24)
at com.foo.HelloExample2.main(HelloExample2.java:14)
[04/Jun/2016:12:41:48 +0100] WARN HelloExample: This is warn