2016-04-23 05:47:26 +08:00
|
|
|
package tail
|
2016-04-27 00:43:41 +08:00
|
|
|
|
|
|
|
|
import (
|
2019-09-24 06:39:50 +08:00
|
|
|
"bytes"
|
|
|
|
|
"log"
|
2016-04-27 00:43:41 +08:00
|
|
|
"os"
|
2020-11-23 23:40:32 +08:00
|
|
|
"path/filepath"
|
2021-03-05 03:37:57 +08:00
|
|
|
"runtime"
|
2016-04-27 00:43:41 +08:00
|
|
|
"testing"
|
2019-08-22 07:30:55 +08:00
|
|
|
"time"
|
2016-04-27 00:43:41 +08:00
|
|
|
|
2023-01-27 04:51:39 +08:00
|
|
|
"github.com/google/go-cmp/cmp"
|
2020-11-23 23:40:32 +08:00
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
|
|
2019-08-22 07:30:55 +08:00
|
|
|
"github.com/influxdata/telegraf"
|
2021-04-10 01:15:04 +08:00
|
|
|
"github.com/influxdata/telegraf/config"
|
2023-01-27 04:51:39 +08:00
|
|
|
"github.com/influxdata/telegraf/metric"
|
2019-08-22 07:30:55 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/parsers/csv"
|
2022-06-30 22:10:27 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/parsers/grok"
|
2020-07-08 03:43:32 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
2019-08-22 07:30:55 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/parsers/json"
|
2016-04-27 00:43:41 +08:00
|
|
|
"github.com/influxdata/telegraf/testutil"
|
2020-11-23 23:40:32 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
testdataDir = getTestdataDir()
|
2016-04-27 00:43:41 +08:00
|
|
|
)
|
|
|
|
|
|
2023-05-24 04:17:11 +08:00
|
|
|
func NewInfluxParser() (telegraf.Parser, error) {
|
2022-07-07 04:23:13 +08:00
|
|
|
parser := &influx.Parser{}
|
|
|
|
|
err := parser.Init()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return parser, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-05 03:37:57 +08:00
|
|
|
func NewTestTail() *Tail {
|
|
|
|
|
offsetsMutex.Lock()
|
|
|
|
|
offsetsCopy := make(map[string]int64, len(offsets))
|
|
|
|
|
for k, v := range offsets {
|
|
|
|
|
offsetsCopy[k] = v
|
|
|
|
|
}
|
|
|
|
|
offsetsMutex.Unlock()
|
|
|
|
|
watchMethod := defaultWatchMethod
|
|
|
|
|
|
|
|
|
|
if runtime.GOOS == "windows" {
|
|
|
|
|
watchMethod = "poll"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &Tail{
|
|
|
|
|
FromBeginning: false,
|
|
|
|
|
MaxUndeliveredLines: 1000,
|
|
|
|
|
offsets: offsetsCopy,
|
|
|
|
|
WatchMethod: watchMethod,
|
2021-04-01 00:06:13 +08:00
|
|
|
PathTag: "path",
|
2021-03-05 03:37:57 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2020-07-08 03:43:32 +08:00
|
|
|
func TestTailBadLine(t *testing.T) {
|
2021-09-29 05:16:32 +08:00
|
|
|
tmpfile, err := os.CreateTemp("", "")
|
2016-04-27 00:43:41 +08:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer os.Remove(tmpfile.Name())
|
2020-03-28 06:40:08 +08:00
|
|
|
|
2020-07-08 03:43:32 +08:00
|
|
|
_, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n")
|
2020-03-28 06:40:08 +08:00
|
|
|
require.NoError(t, err)
|
2016-04-27 00:43:41 +08:00
|
|
|
|
2020-07-08 03:43:32 +08:00
|
|
|
// Write good metric so we can detect when processing is complete
|
|
|
|
|
_, err = tmpfile.WriteString("cpu usage_idle=100\n")
|
2016-04-27 00:43:41 +08:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
2021-04-09 00:43:39 +08:00
|
|
|
require.NoError(t, tmpfile.Close())
|
2020-07-11 01:59:06 +08:00
|
|
|
|
2020-07-11 04:58:38 +08:00
|
|
|
buf := &bytes.Buffer{}
|
|
|
|
|
log.SetOutput(buf)
|
|
|
|
|
|
2021-03-05 03:37:57 +08:00
|
|
|
tt := NewTestTail()
|
2019-09-24 06:39:50 +08:00
|
|
|
tt.Log = testutil.Logger{}
|
2016-04-27 00:43:41 +08:00
|
|
|
tt.FromBeginning = true
|
|
|
|
|
tt.Files = []string{tmpfile.Name()}
|
2022-07-07 04:23:13 +08:00
|
|
|
tt.SetParserFunc(NewInfluxParser)
|
2020-03-28 06:40:08 +08:00
|
|
|
|
|
|
|
|
err = tt.Init()
|
|
|
|
|
require.NoError(t, err)
|
2016-04-27 00:43:41 +08:00
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
|
|
|
|
require.NoError(t, tt.Start(&acc))
|
2019-09-24 06:39:50 +08:00
|
|
|
|
2018-03-09 05:03:48 +08:00
|
|
|
require.NoError(t, acc.GatherError(tt.Gather))
|
2016-04-27 00:43:41 +08:00
|
|
|
|
2020-07-08 03:43:32 +08:00
|
|
|
acc.Wait(1)
|
2016-04-27 00:43:41 +08:00
|
|
|
|
2020-07-03 06:20:47 +08:00
|
|
|
tt.Stop()
|
2021-11-18 22:22:43 +08:00
|
|
|
require.Contains(t, buf.String(), "Malformed log line")
|
2016-04-27 00:43:41 +08:00
|
|
|
}
|
2017-06-17 04:16:48 +08:00
|
|
|
|
2022-03-30 23:00:16 +08:00
|
|
|
func TestColoredLine(t *testing.T) {
|
|
|
|
|
tmpfile, err := os.CreateTemp("", "")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer os.Remove(tmpfile.Name())
|
|
|
|
|
_, err = tmpfile.WriteString("cpu usage_idle=\033[4A\033[4A100\ncpu2 usage_idle=200\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, tmpfile.Close())
|
|
|
|
|
|
|
|
|
|
tt := NewTestTail()
|
|
|
|
|
tt.Log = testutil.Logger{}
|
|
|
|
|
tt.FromBeginning = true
|
|
|
|
|
tt.Filters = []string{"ansi_color"}
|
|
|
|
|
tt.Files = []string{tmpfile.Name()}
|
2022-07-07 04:23:13 +08:00
|
|
|
tt.SetParserFunc(NewInfluxParser)
|
2022-03-30 23:00:16 +08:00
|
|
|
|
|
|
|
|
err = tt.Init()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
|
|
|
|
require.NoError(t, tt.Start(&acc))
|
|
|
|
|
defer tt.Stop()
|
|
|
|
|
require.NoError(t, acc.GatherError(tt.Gather))
|
|
|
|
|
|
|
|
|
|
acc.Wait(2)
|
|
|
|
|
acc.AssertContainsFields(t, "cpu",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_idle": float64(100),
|
|
|
|
|
})
|
|
|
|
|
acc.AssertContainsFields(t, "cpu2",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_idle": float64(200),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2020-11-23 23:40:32 +08:00
|
|
|
func TestTailDosLineEndings(t *testing.T) {
|
2021-09-29 05:16:32 +08:00
|
|
|
tmpfile, err := os.CreateTemp("", "")
|
2017-06-17 04:16:48 +08:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer os.Remove(tmpfile.Name())
|
|
|
|
|
_, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n")
|
|
|
|
|
require.NoError(t, err)
|
2021-04-09 00:43:39 +08:00
|
|
|
require.NoError(t, tmpfile.Close())
|
2017-06-17 04:16:48 +08:00
|
|
|
|
2021-03-05 03:37:57 +08:00
|
|
|
tt := NewTestTail()
|
2019-09-24 06:39:50 +08:00
|
|
|
tt.Log = testutil.Logger{}
|
2017-06-17 04:16:48 +08:00
|
|
|
tt.FromBeginning = true
|
|
|
|
|
tt.Files = []string{tmpfile.Name()}
|
2022-07-07 04:23:13 +08:00
|
|
|
tt.SetParserFunc(NewInfluxParser)
|
2020-03-28 06:40:08 +08:00
|
|
|
|
|
|
|
|
err = tt.Init()
|
|
|
|
|
require.NoError(t, err)
|
2017-06-17 04:16:48 +08:00
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
|
|
|
|
require.NoError(t, tt.Start(&acc))
|
2020-03-28 06:40:08 +08:00
|
|
|
defer tt.Stop()
|
2017-06-17 04:16:48 +08:00
|
|
|
require.NoError(t, acc.GatherError(tt.Gather))
|
|
|
|
|
|
|
|
|
|
acc.Wait(2)
|
|
|
|
|
acc.AssertContainsFields(t, "cpu",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_idle": float64(100),
|
|
|
|
|
})
|
|
|
|
|
acc.AssertContainsFields(t, "cpu2",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_idle": float64(200),
|
|
|
|
|
})
|
|
|
|
|
}
|
2019-08-22 07:30:55 +08:00
|
|
|
|
2020-09-29 06:06:00 +08:00
|
|
|
func TestGrokParseLogFilesWithMultiline(t *testing.T) {
|
|
|
|
|
//we make sure the timeout won't kick in
|
2021-04-10 01:15:04 +08:00
|
|
|
d, _ := time.ParseDuration("100s")
|
|
|
|
|
duration := config.Duration(d)
|
|
|
|
|
tt := NewTail()
|
2020-09-29 06:06:00 +08:00
|
|
|
tt.Log = testutil.Logger{}
|
|
|
|
|
tt.FromBeginning = true
|
2020-11-23 23:40:32 +08:00
|
|
|
tt.Files = []string{filepath.Join(testdataDir, "test_multiline.log")}
|
2020-09-29 06:06:00 +08:00
|
|
|
tt.MultilineConfig = MultilineConfig{
|
|
|
|
|
Pattern: `^[^\[]`,
|
|
|
|
|
MatchWhichLine: Previous,
|
|
|
|
|
InvertMatch: false,
|
2021-04-10 01:15:04 +08:00
|
|
|
Timeout: &duration,
|
2020-09-29 06:06:00 +08:00
|
|
|
}
|
|
|
|
|
tt.SetParserFunc(createGrokParser)
|
|
|
|
|
|
|
|
|
|
err := tt.Init()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
2021-11-18 22:22:43 +08:00
|
|
|
require.NoError(t, tt.Start(&acc))
|
2020-09-29 06:06:00 +08:00
|
|
|
defer tt.Stop()
|
|
|
|
|
|
|
|
|
|
acc.Wait(3)
|
|
|
|
|
|
2020-11-23 23:40:32 +08:00
|
|
|
expectedPath := filepath.Join(testdataDir, "test_multiline.log")
|
2020-09-29 06:06:00 +08:00
|
|
|
acc.AssertContainsTaggedFields(t, "tail_grok",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"message": "HelloExample: This is debug",
|
|
|
|
|
},
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": expectedPath,
|
|
|
|
|
"loglevel": "DEBUG",
|
|
|
|
|
})
|
|
|
|
|
acc.AssertContainsTaggedFields(t, "tail_grok",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"message": "HelloExample: This is info",
|
|
|
|
|
},
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": expectedPath,
|
|
|
|
|
"loglevel": "INFO",
|
|
|
|
|
})
|
|
|
|
|
acc.AssertContainsTaggedFields(t, "tail_grok",
|
|
|
|
|
map[string]interface{}{
|
2022-11-11 02:41:43 +08:00
|
|
|
"message": "HelloExample: Sorry, something wrong! java.lang.ArithmeticException: / by zero\t" +
|
|
|
|
|
"at com.foo.HelloExample2.divide(HelloExample2.java:24)\tat com.foo.HelloExample2.main(HelloExample2.java:14)",
|
2020-09-29 06:06:00 +08:00
|
|
|
},
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": expectedPath,
|
|
|
|
|
"loglevel": "ERROR",
|
|
|
|
|
})
|
|
|
|
|
|
2021-11-18 22:22:43 +08:00
|
|
|
require.Equal(t, uint64(3), acc.NMetrics())
|
2020-09-29 06:06:00 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) {
|
2021-09-29 05:16:32 +08:00
|
|
|
tmpfile, err := os.CreateTemp("", "")
|
2020-09-29 06:06:00 +08:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer os.Remove(tmpfile.Name())
|
|
|
|
|
|
2020-11-23 23:40:32 +08:00
|
|
|
// This seems necessary in order to get the test to read the following lines.
|
2020-09-29 06:06:00 +08:00
|
|
|
_, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is fluff\r\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, tmpfile.Sync())
|
|
|
|
|
|
|
|
|
|
// set tight timeout for tests
|
2021-04-10 01:15:04 +08:00
|
|
|
d := 10 * time.Millisecond
|
|
|
|
|
duration := config.Duration(d)
|
|
|
|
|
tt := NewTail()
|
2020-09-29 06:06:00 +08:00
|
|
|
|
|
|
|
|
tt.Log = testutil.Logger{}
|
|
|
|
|
tt.FromBeginning = true
|
|
|
|
|
tt.Files = []string{tmpfile.Name()}
|
|
|
|
|
tt.MultilineConfig = MultilineConfig{
|
|
|
|
|
Pattern: `^[^\[]`,
|
|
|
|
|
MatchWhichLine: Previous,
|
|
|
|
|
InvertMatch: false,
|
2021-04-10 01:15:04 +08:00
|
|
|
Timeout: &duration,
|
2020-09-29 06:06:00 +08:00
|
|
|
}
|
|
|
|
|
tt.SetParserFunc(createGrokParser)
|
|
|
|
|
|
|
|
|
|
err = tt.Init()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
2021-11-18 22:22:43 +08:00
|
|
|
require.NoError(t, tt.Start(&acc))
|
2020-09-29 06:06:00 +08:00
|
|
|
time.Sleep(11 * time.Millisecond) // will force timeout
|
|
|
|
|
_, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is info\r\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, tmpfile.Sync())
|
|
|
|
|
acc.Wait(2)
|
|
|
|
|
time.Sleep(11 * time.Millisecond) // will force timeout
|
|
|
|
|
_, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] WARN HelloExample: This is warn\r\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, tmpfile.Sync())
|
|
|
|
|
acc.Wait(3)
|
|
|
|
|
tt.Stop()
|
2021-11-18 22:22:43 +08:00
|
|
|
require.Equal(t, uint64(3), acc.NMetrics())
|
2020-09-29 06:06:00 +08:00
|
|
|
expectedPath := tmpfile.Name()
|
|
|
|
|
|
|
|
|
|
acc.AssertContainsTaggedFields(t, "tail_grok",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"message": "HelloExample: This is info",
|
|
|
|
|
},
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": expectedPath,
|
|
|
|
|
"loglevel": "INFO",
|
|
|
|
|
})
|
|
|
|
|
acc.AssertContainsTaggedFields(t, "tail_grok",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"message": "HelloExample: This is warn",
|
|
|
|
|
},
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": expectedPath,
|
|
|
|
|
"loglevel": "WARN",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *testing.T) {
|
|
|
|
|
//we make sure the timeout won't kick in
|
2021-04-10 01:15:04 +08:00
|
|
|
duration := config.Duration(100 * time.Second)
|
2020-09-29 06:06:00 +08:00
|
|
|
|
2021-03-05 03:37:57 +08:00
|
|
|
tt := NewTestTail()
|
2020-09-29 06:06:00 +08:00
|
|
|
tt.Log = testutil.Logger{}
|
|
|
|
|
tt.FromBeginning = true
|
2020-11-23 23:40:32 +08:00
|
|
|
tt.Files = []string{filepath.Join(testdataDir, "test_multiline.log")}
|
2020-09-29 06:06:00 +08:00
|
|
|
tt.MultilineConfig = MultilineConfig{
|
|
|
|
|
Pattern: `^[^\[]`,
|
|
|
|
|
MatchWhichLine: Previous,
|
|
|
|
|
InvertMatch: false,
|
2021-04-10 01:15:04 +08:00
|
|
|
Timeout: &duration,
|
2020-09-29 06:06:00 +08:00
|
|
|
}
|
|
|
|
|
tt.SetParserFunc(createGrokParser)
|
|
|
|
|
|
|
|
|
|
err := tt.Init()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
2021-11-18 22:22:43 +08:00
|
|
|
require.NoError(t, tt.Start(&acc))
|
2020-09-29 06:06:00 +08:00
|
|
|
acc.Wait(3)
|
2021-11-18 22:22:43 +08:00
|
|
|
require.Equal(t, uint64(3), acc.NMetrics())
|
2020-09-29 06:06:00 +08:00
|
|
|
// Close tailer, so multiline buffer is flushed
|
|
|
|
|
tt.Stop()
|
|
|
|
|
acc.Wait(4)
|
|
|
|
|
|
2020-11-23 23:40:32 +08:00
|
|
|
expectedPath := filepath.Join(testdataDir, "test_multiline.log")
|
2020-09-29 06:06:00 +08:00
|
|
|
acc.AssertContainsTaggedFields(t, "tail_grok",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"message": "HelloExample: This is warn",
|
|
|
|
|
},
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": expectedPath,
|
|
|
|
|
"loglevel": "WARN",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-24 04:17:11 +08:00
|
|
|
func createGrokParser() (telegraf.Parser, error) {
|
2022-06-30 22:10:27 +08:00
|
|
|
parser := &grok.Parser{
|
|
|
|
|
Measurement: "tail_grok",
|
|
|
|
|
Patterns: []string{"%{TEST_LOG_MULTILINE}"},
|
|
|
|
|
CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")},
|
|
|
|
|
Log: testutil.Logger{},
|
2020-09-29 06:06:00 +08:00
|
|
|
}
|
2022-06-30 22:10:27 +08:00
|
|
|
err := parser.Init()
|
2020-09-29 06:06:00 +08:00
|
|
|
return parser, err
|
|
|
|
|
}
|
|
|
|
|
|
2019-08-22 07:30:55 +08:00
|
|
|
// The csv parser should only parse the header line once per file.
|
|
|
|
|
func TestCSVHeadersParsedOnce(t *testing.T) {
|
2021-09-29 05:16:32 +08:00
|
|
|
tmpfile, err := os.CreateTemp("", "")
|
2019-08-22 07:30:55 +08:00
|
|
|
require.NoError(t, err)
|
2020-07-11 01:59:06 +08:00
|
|
|
defer os.Remove(tmpfile.Name())
|
2019-08-22 07:30:55 +08:00
|
|
|
|
|
|
|
|
_, err = tmpfile.WriteString(`
|
|
|
|
|
measurement,time_idle
|
|
|
|
|
cpu,42
|
|
|
|
|
cpu,42
|
|
|
|
|
`)
|
|
|
|
|
require.NoError(t, err)
|
2021-04-09 00:43:39 +08:00
|
|
|
require.NoError(t, tmpfile.Close())
|
2019-08-22 07:30:55 +08:00
|
|
|
|
2021-03-05 03:37:57 +08:00
|
|
|
plugin := NewTestTail()
|
2019-09-24 06:39:50 +08:00
|
|
|
plugin.Log = testutil.Logger{}
|
2019-08-22 07:30:55 +08:00
|
|
|
plugin.FromBeginning = true
|
|
|
|
|
plugin.Files = []string{tmpfile.Name()}
|
2023-05-24 04:17:11 +08:00
|
|
|
plugin.SetParserFunc(func() (telegraf.Parser, error) {
|
2022-01-13 06:54:42 +08:00
|
|
|
parser := csv.Parser{
|
2019-08-22 07:30:55 +08:00
|
|
|
MeasurementColumn: "measurement",
|
|
|
|
|
HeaderRowCount: 1,
|
|
|
|
|
TimeFunc: func() time.Time { return time.Unix(0, 0) },
|
2022-01-13 06:54:42 +08:00
|
|
|
}
|
|
|
|
|
err := parser.Init()
|
|
|
|
|
return &parser, err
|
2019-08-22 07:30:55 +08:00
|
|
|
})
|
2020-03-28 06:40:08 +08:00
|
|
|
|
|
|
|
|
err = plugin.Init()
|
|
|
|
|
require.NoError(t, err)
|
2019-08-22 07:30:55 +08:00
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
|
|
|
|
err = plugin.Start(&acc)
|
|
|
|
|
require.NoError(t, err)
|
2020-03-28 06:40:08 +08:00
|
|
|
defer plugin.Stop()
|
2019-08-22 07:30:55 +08:00
|
|
|
err = plugin.Gather(&acc)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
acc.Wait(2)
|
|
|
|
|
plugin.Stop()
|
|
|
|
|
|
|
|
|
|
expected := []telegraf.Metric{
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": tmpfile.Name(),
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
2020-05-27 05:16:48 +08:00
|
|
|
"time_idle": 42,
|
2019-08-22 07:30:55 +08:00
|
|
|
},
|
|
|
|
|
time.Unix(0, 0)),
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": tmpfile.Name(),
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
2020-05-27 05:16:48 +08:00
|
|
|
"time_idle": 42,
|
2019-08-22 07:30:55 +08:00
|
|
|
},
|
|
|
|
|
time.Unix(0, 0)),
|
|
|
|
|
}
|
|
|
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-17 06:05:48 +08:00
|
|
|
func TestCSVMultiHeaderWithSkipRowANDColumn(t *testing.T) {
|
|
|
|
|
tmpfile, err := os.CreateTemp("", "")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer os.Remove(tmpfile.Name())
|
|
|
|
|
|
|
|
|
|
_, err = tmpfile.WriteString(`garbage nonsense
|
|
|
|
|
skip,measurement,value
|
|
|
|
|
row,1,2
|
|
|
|
|
skip1,cpu,42
|
|
|
|
|
skip2,mem,100
|
|
|
|
|
`)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, tmpfile.Close())
|
|
|
|
|
|
|
|
|
|
plugin := NewTestTail()
|
|
|
|
|
plugin.Log = testutil.Logger{}
|
|
|
|
|
plugin.FromBeginning = true
|
|
|
|
|
plugin.Files = []string{tmpfile.Name()}
|
2023-05-24 04:17:11 +08:00
|
|
|
plugin.SetParserFunc(func() (telegraf.Parser, error) {
|
2022-01-13 06:54:42 +08:00
|
|
|
parser := csv.Parser{
|
2021-11-17 06:05:48 +08:00
|
|
|
MeasurementColumn: "measurement1",
|
|
|
|
|
HeaderRowCount: 2,
|
|
|
|
|
SkipRows: 1,
|
|
|
|
|
SkipColumns: 1,
|
|
|
|
|
TimeFunc: func() time.Time { return time.Unix(0, 0) },
|
2022-01-13 06:54:42 +08:00
|
|
|
}
|
|
|
|
|
err := parser.Init()
|
|
|
|
|
return &parser, err
|
2021-11-17 06:05:48 +08:00
|
|
|
})
|
|
|
|
|
|
|
|
|
|
err = plugin.Init()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
|
|
|
|
err = plugin.Start(&acc)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer plugin.Stop()
|
|
|
|
|
err = plugin.Gather(&acc)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
acc.Wait(2)
|
|
|
|
|
plugin.Stop()
|
|
|
|
|
|
|
|
|
|
expected := []telegraf.Metric{
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": tmpfile.Name(),
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"value2": 42,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0)),
|
|
|
|
|
testutil.MustMetric("mem",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": tmpfile.Name(),
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"value2": 100,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0)),
|
|
|
|
|
}
|
|
|
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
|
|
|
|
|
}
|
|
|
|
|
|
2019-08-22 07:30:55 +08:00
|
|
|
// Ensure that the first line can produce multiple metrics (#6138)
|
|
|
|
|
func TestMultipleMetricsOnFirstLine(t *testing.T) {
|
2021-09-29 05:16:32 +08:00
|
|
|
tmpfile, err := os.CreateTemp("", "")
|
2019-08-22 07:30:55 +08:00
|
|
|
require.NoError(t, err)
|
2020-07-11 01:59:06 +08:00
|
|
|
defer os.Remove(tmpfile.Name())
|
2019-08-22 07:30:55 +08:00
|
|
|
|
|
|
|
|
_, err = tmpfile.WriteString(`
|
|
|
|
|
[{"time_idle": 42}, {"time_idle": 42}]
|
|
|
|
|
`)
|
|
|
|
|
require.NoError(t, err)
|
2021-04-09 00:43:39 +08:00
|
|
|
require.NoError(t, tmpfile.Close())
|
2019-08-22 07:30:55 +08:00
|
|
|
|
2021-03-05 03:37:57 +08:00
|
|
|
plugin := NewTestTail()
|
2019-09-24 06:39:50 +08:00
|
|
|
plugin.Log = testutil.Logger{}
|
2019-08-22 07:30:55 +08:00
|
|
|
plugin.FromBeginning = true
|
|
|
|
|
plugin.Files = []string{tmpfile.Name()}
|
2021-04-01 00:06:13 +08:00
|
|
|
plugin.PathTag = "customPathTagMyFile"
|
2023-05-24 04:17:11 +08:00
|
|
|
plugin.SetParserFunc(func() (telegraf.Parser, error) {
|
2022-06-22 23:56:51 +08:00
|
|
|
p := &json.Parser{MetricName: "cpu"}
|
|
|
|
|
err := p.Init()
|
|
|
|
|
return p, err
|
2019-08-22 07:30:55 +08:00
|
|
|
})
|
2020-03-28 06:40:08 +08:00
|
|
|
|
|
|
|
|
err = plugin.Init()
|
|
|
|
|
require.NoError(t, err)
|
2019-08-22 07:30:55 +08:00
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
|
|
|
|
err = plugin.Start(&acc)
|
|
|
|
|
require.NoError(t, err)
|
2020-03-28 06:40:08 +08:00
|
|
|
defer plugin.Stop()
|
2019-08-22 07:30:55 +08:00
|
|
|
err = plugin.Gather(&acc)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
acc.Wait(2)
|
|
|
|
|
plugin.Stop()
|
|
|
|
|
|
|
|
|
|
expected := []telegraf.Metric{
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
2021-04-01 00:06:13 +08:00
|
|
|
"customPathTagMyFile": tmpfile.Name(),
|
2019-08-22 07:30:55 +08:00
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"time_idle": 42.0,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0)),
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
2021-04-01 00:06:13 +08:00
|
|
|
"customPathTagMyFile": tmpfile.Name(),
|
2019-08-22 07:30:55 +08:00
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"time_idle": 42.0,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0)),
|
|
|
|
|
}
|
|
|
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(),
|
|
|
|
|
testutil.IgnoreTime())
|
|
|
|
|
}
|
2020-07-08 03:43:32 +08:00
|
|
|
|
|
|
|
|
func TestCharacterEncoding(t *testing.T) {
|
|
|
|
|
full := []telegraf.Metric{
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"cpu": "cpu0",
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_active": 11.9,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0),
|
|
|
|
|
),
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"cpu": "cpu1",
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_active": 26.0,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0),
|
|
|
|
|
),
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"cpu": "cpu2",
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_active": 14.0,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0),
|
|
|
|
|
),
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"cpu": "cpu3",
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_active": 20.4,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0),
|
|
|
|
|
),
|
|
|
|
|
testutil.MustMetric("cpu",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"cpu": "cpu-total",
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_active": 18.4,
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0),
|
|
|
|
|
),
|
|
|
|
|
}
|
|
|
|
|
|
2021-09-15 04:56:49 +08:00
|
|
|
watchMethod := defaultWatchMethod
|
|
|
|
|
if runtime.GOOS == "windows" {
|
|
|
|
|
watchMethod = "poll"
|
|
|
|
|
}
|
|
|
|
|
|
2020-07-08 03:43:32 +08:00
|
|
|
tests := []struct {
|
2021-09-15 04:56:49 +08:00
|
|
|
name string
|
|
|
|
|
testfiles string
|
|
|
|
|
fromBeginning bool
|
|
|
|
|
characterEncoding string
|
|
|
|
|
offset int64
|
|
|
|
|
expected []telegraf.Metric
|
2020-07-08 03:43:32 +08:00
|
|
|
}{
|
|
|
|
|
{
|
2021-09-15 04:56:49 +08:00
|
|
|
name: "utf-8",
|
|
|
|
|
testfiles: "cpu-utf-8.influx",
|
|
|
|
|
fromBeginning: true,
|
|
|
|
|
characterEncoding: "utf-8",
|
|
|
|
|
expected: full,
|
2020-07-08 03:43:32 +08:00
|
|
|
},
|
|
|
|
|
{
|
2021-09-15 04:56:49 +08:00
|
|
|
name: "utf-8 seek",
|
|
|
|
|
testfiles: "cpu-utf-8.influx",
|
|
|
|
|
characterEncoding: "utf-8",
|
|
|
|
|
offset: 0x33,
|
|
|
|
|
expected: full[1:],
|
2020-07-08 03:43:32 +08:00
|
|
|
},
|
|
|
|
|
{
|
2021-09-15 04:56:49 +08:00
|
|
|
name: "utf-16le",
|
|
|
|
|
testfiles: "cpu-utf-16le.influx",
|
|
|
|
|
fromBeginning: true,
|
|
|
|
|
characterEncoding: "utf-16le",
|
|
|
|
|
expected: full,
|
2020-07-08 03:43:32 +08:00
|
|
|
},
|
|
|
|
|
{
|
2021-09-15 04:56:49 +08:00
|
|
|
name: "utf-16le seek",
|
|
|
|
|
testfiles: "cpu-utf-16le.influx",
|
|
|
|
|
characterEncoding: "utf-16le",
|
|
|
|
|
offset: 0x68,
|
|
|
|
|
expected: full[1:],
|
2020-07-08 03:43:32 +08:00
|
|
|
},
|
|
|
|
|
{
|
2021-09-15 04:56:49 +08:00
|
|
|
name: "utf-16be",
|
|
|
|
|
testfiles: "cpu-utf-16be.influx",
|
|
|
|
|
fromBeginning: true,
|
|
|
|
|
characterEncoding: "utf-16be",
|
|
|
|
|
expected: full,
|
2020-07-08 03:43:32 +08:00
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
for _, tt := range tests {
|
|
|
|
|
t.Run(tt.name, func(t *testing.T) {
|
2021-09-15 04:56:49 +08:00
|
|
|
plugin := &Tail{
|
|
|
|
|
Files: []string{filepath.Join(testdataDir, tt.testfiles)},
|
|
|
|
|
FromBeginning: tt.fromBeginning,
|
|
|
|
|
MaxUndeliveredLines: 1000,
|
|
|
|
|
Log: testutil.Logger{},
|
|
|
|
|
CharacterEncoding: tt.characterEncoding,
|
|
|
|
|
WatchMethod: watchMethod,
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-07 04:23:13 +08:00
|
|
|
plugin.SetParserFunc(NewInfluxParser)
|
2023-03-02 06:34:48 +08:00
|
|
|
require.NoError(t, plugin.Init())
|
2020-07-08 03:43:32 +08:00
|
|
|
|
|
|
|
|
if tt.offset != 0 {
|
2021-09-15 04:56:49 +08:00
|
|
|
plugin.offsets = map[string]int64{
|
|
|
|
|
plugin.Files[0]: tt.offset,
|
2020-07-08 03:43:32 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var acc testutil.Accumulator
|
2023-03-02 06:34:48 +08:00
|
|
|
require.NoError(t, plugin.Start(&acc))
|
2020-07-08 03:43:32 +08:00
|
|
|
acc.Wait(len(tt.expected))
|
2021-09-15 04:56:49 +08:00
|
|
|
plugin.Stop()
|
2020-07-08 03:43:32 +08:00
|
|
|
|
|
|
|
|
actual := acc.GetTelegrafMetrics()
|
|
|
|
|
for _, m := range actual {
|
|
|
|
|
m.RemoveTag("path")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime())
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
2020-08-01 03:31:02 +08:00
|
|
|
|
|
|
|
|
func TestTailEOF(t *testing.T) {
|
2021-09-29 05:16:32 +08:00
|
|
|
tmpfile, err := os.CreateTemp("", "")
|
2020-08-01 03:31:02 +08:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer os.Remove(tmpfile.Name())
|
|
|
|
|
_, err = tmpfile.WriteString("cpu usage_idle=100\r\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = tmpfile.Sync()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
2021-03-05 03:37:57 +08:00
|
|
|
tt := NewTestTail()
|
2020-08-01 03:31:02 +08:00
|
|
|
tt.Log = testutil.Logger{}
|
|
|
|
|
tt.FromBeginning = true
|
|
|
|
|
tt.Files = []string{tmpfile.Name()}
|
2022-07-07 04:23:13 +08:00
|
|
|
tt.SetParserFunc(NewInfluxParser)
|
2020-08-01 03:31:02 +08:00
|
|
|
|
|
|
|
|
err = tt.Init()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
acc := testutil.Accumulator{}
|
|
|
|
|
require.NoError(t, tt.Start(&acc))
|
|
|
|
|
defer tt.Stop()
|
|
|
|
|
require.NoError(t, acc.GatherError(tt.Gather))
|
|
|
|
|
acc.Wait(1) // input hits eof
|
|
|
|
|
|
|
|
|
|
_, err = tmpfile.WriteString("cpu2 usage_idle=200\r\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = tmpfile.Sync()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
acc.Wait(2)
|
|
|
|
|
require.NoError(t, acc.GatherError(tt.Gather))
|
|
|
|
|
acc.AssertContainsFields(t, "cpu",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_idle": float64(100),
|
|
|
|
|
})
|
|
|
|
|
acc.AssertContainsFields(t, "cpu2",
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"usage_idle": float64(200),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
err = tmpfile.Close()
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
}
|
2020-11-23 23:40:32 +08:00
|
|
|
|
2023-01-27 04:51:39 +08:00
|
|
|
func TestCSVBehavior(t *testing.T) {
|
|
|
|
|
// Prepare the input file
|
|
|
|
|
input, err := os.CreateTemp("", "")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer os.Remove(input.Name())
|
|
|
|
|
// Write header
|
|
|
|
|
_, err = input.WriteString("a,b\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, input.Sync())
|
|
|
|
|
|
|
|
|
|
// Setup the CSV parser creator function
|
2023-05-24 04:17:11 +08:00
|
|
|
parserFunc := func() (telegraf.Parser, error) {
|
2023-01-27 04:51:39 +08:00
|
|
|
parser := &csv.Parser{
|
|
|
|
|
MetricName: "tail",
|
|
|
|
|
HeaderRowCount: 1,
|
|
|
|
|
}
|
|
|
|
|
err := parser.Init()
|
|
|
|
|
return parser, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Setup the plugin
|
|
|
|
|
plugin := &Tail{
|
|
|
|
|
Files: []string{input.Name()},
|
|
|
|
|
FromBeginning: true,
|
|
|
|
|
MaxUndeliveredLines: 1000,
|
|
|
|
|
offsets: make(map[string]int64, 0),
|
|
|
|
|
PathTag: "path",
|
|
|
|
|
Log: testutil.Logger{},
|
|
|
|
|
}
|
|
|
|
|
plugin.SetParserFunc(parserFunc)
|
|
|
|
|
require.NoError(t, plugin.Init())
|
|
|
|
|
|
|
|
|
|
expected := []telegraf.Metric{
|
|
|
|
|
metric.New(
|
|
|
|
|
"tail",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": input.Name(),
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"a": int64(1),
|
|
|
|
|
"b": int64(2),
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0),
|
|
|
|
|
),
|
|
|
|
|
metric.New(
|
|
|
|
|
"tail",
|
|
|
|
|
map[string]string{
|
|
|
|
|
"path": input.Name(),
|
|
|
|
|
},
|
|
|
|
|
map[string]interface{}{
|
|
|
|
|
"a": int64(3),
|
|
|
|
|
"b": int64(4),
|
|
|
|
|
},
|
|
|
|
|
time.Unix(0, 0),
|
|
|
|
|
),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var acc testutil.Accumulator
|
|
|
|
|
require.NoError(t, plugin.Start(&acc))
|
|
|
|
|
defer plugin.Stop()
|
|
|
|
|
|
|
|
|
|
// Write the first line of data
|
|
|
|
|
_, err = input.WriteString("1,2\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, input.Sync())
|
|
|
|
|
require.NoError(t, plugin.Gather(&acc))
|
|
|
|
|
|
|
|
|
|
// Write another line of data
|
|
|
|
|
_, err = input.WriteString("3,4\n")
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NoError(t, input.Sync())
|
|
|
|
|
require.NoError(t, plugin.Gather(&acc))
|
|
|
|
|
require.Eventuallyf(t, func() bool {
|
|
|
|
|
acc.Lock()
|
|
|
|
|
defer acc.Unlock()
|
|
|
|
|
return acc.NMetrics() >= uint64(len(expected))
|
|
|
|
|
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
|
|
|
|
|
|
|
|
|
|
// Check the result
|
|
|
|
|
options := []cmp.Option{
|
|
|
|
|
testutil.SortMetrics(),
|
|
|
|
|
testutil.IgnoreTime(),
|
|
|
|
|
}
|
|
|
|
|
actual := acc.GetTelegrafMetrics()
|
|
|
|
|
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
|
|
|
|
|
|
|
|
|
// Close the input file
|
|
|
|
|
require.NoError(t, input.Close())
|
|
|
|
|
}
|
|
|
|
|
|
2020-11-23 23:40:32 +08:00
|
|
|
func getTestdataDir() string {
|
|
|
|
|
dir, err := os.Getwd()
|
|
|
|
|
if err != nil {
|
|
|
|
|
// if we cannot even establish the test directory, further progress is meaningless
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return filepath.Join(dir, "testdata")
|
|
|
|
|
}
|