diff --git a/plugins/inputs/tail/README.md b/plugins/inputs/tail/README.md index 7e6358129..965af7734 100644 --- a/plugins/inputs/tail/README.md +++ b/plugins/inputs/tail/README.md @@ -12,7 +12,7 @@ tail -F --lines=0 myfile.log that it will be compatible with log-rotated files, and that it will retry on inaccessible files. - `--lines=0` means that it will start at the end of the file (unless -the `from_beginning` option is set). +the `initial_read_offset` option is set). see for more details. @@ -56,8 +56,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## files = ["/var/mymetrics.out"] - ## Read file from beginning. - # from_beginning = false + ## Offset to start reading at + ## The following methods are available: + ## beginning -- start reading from the beginning of the file ignoring any persisted offset + ## end -- start reading from the end of the file ignoring any persisted offset + ## saved-or-beginning -- use the persisted offset of the file or, if no offset persisted, start from the beginning of the file + ## saved-or-end -- use the persisted offset of the file or, if no offset persisted, start from the end of the file + # initial_read_offset = "saved-or-end" ## Whether file is a named pipe # pipe = false diff --git a/plugins/inputs/tail/sample.conf b/plugins/inputs/tail/sample.conf index 1863546ee..97c3290d9 100644 --- a/plugins/inputs/tail/sample.conf +++ b/plugins/inputs/tail/sample.conf @@ -12,8 +12,13 @@ ## files = ["/var/mymetrics.out"] - ## Read file from beginning. - # from_beginning = false + ## Offset to start reading at + ## The following methods are available: + ## beginning -- start reading from the beginning of the file ignoring any persisted offset + ## end -- start reading from the end of the file ignoring any persisted offset + ## saved-or-beginning -- use the persisted offset of the file or, if no offset persisted, start from the beginning of the file + ## saved-or-end -- use the persisted offset of the file or, if no offset persisted, start from the end of the file + # initial_read_offset = "saved-or-end" ## Whether file is a named pipe # pipe = false diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index e42dae837..251927054 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -37,7 +37,8 @@ var ( type Tail struct { Files []string `toml:"files"` - FromBeginning bool `toml:"from_beginning"` + FromBeginning bool `toml:"from_beginning" deprecated:"1.34.0;1.40.0;use 'initial_read_offset' with value 'beginning' instead"` + InitialReadOffset string `toml:"initial_read_offset"` Pipe bool `toml:"pipe"` WatchMethod string `toml:"watch_method"` MaxUndeliveredLines int `toml:"max_undelivered_lines"` @@ -89,6 +90,14 @@ func (t *Tail) Init() error { // init offsets t.offsets = make(map[string]int64) + if t.InitialReadOffset == "" { + if t.FromBeginning { + t.InitialReadOffset = "beginning" + } else { + t.InitialReadOffset = "save-or-end" + } + } + var err error t.decoder, err = encoding.NewDecoder(t.CharacterEncoding) return err @@ -121,7 +130,10 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { t.tailers = make(map[string]*tail.Tail) - err = t.tailNewFiles(t.FromBeginning) + err = t.tailNewFiles() + if err != nil { + return err + } // assumption that once Start is called, all parallel plugins have already been initialized offsetsMutex.Lock() @@ -131,6 +143,31 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { return err } +func (t *Tail) getSeekInfo(file string) (*tail.SeekInfo, error) { + switch t.InitialReadOffset { + case "beginning": + return &tail.SeekInfo{Whence: 0, Offset: 0}, nil + case "end": + return &tail.SeekInfo{Whence: 2, Offset: 0}, nil + case "", "save-or-end": + if offset, ok := t.offsets[file]; ok { + t.Log.Debugf("Using offset %d for %q", offset, file) + return &tail.SeekInfo{Whence: 0, Offset: offset}, nil + } else { + return &tail.SeekInfo{Whence: 2, Offset: 0}, nil + } + case "save-or-beginning": + if offset, ok := t.offsets[file]; ok { + t.Log.Debugf("Using offset %d for %q", offset, file) + return &tail.SeekInfo{Whence: 0, Offset: offset}, nil + } else { + return &tail.SeekInfo{Whence: 0, Offset: 0}, nil + } + default: + return nil, errors.New("invalid 'initial_read_offset' setting") + } +} + func (t *Tail) GetState() interface{} { return t.offsets } @@ -147,12 +184,12 @@ func (t *Tail) SetState(state interface{}) error { } func (t *Tail) Gather(_ telegraf.Accumulator) error { - return t.tailNewFiles(true) + return t.tailNewFiles() } func (t *Tail) Stop() { for _, tailer := range t.tailers { - if !t.Pipe && !t.FromBeginning { + if !t.Pipe { // store offset for resume offset, err := tailer.Tell() if err == nil { @@ -179,7 +216,7 @@ func (t *Tail) Stop() { offsetsMutex.Unlock() } -func (t *Tail) tailNewFiles(fromBeginning bool) error { +func (t *Tail) tailNewFiles() error { var poll bool if t.WatchMethod == "poll" { poll = true @@ -197,20 +234,9 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { continue } - var seek *tail.SeekInfo - if !t.Pipe && !fromBeginning { - if offset, ok := t.offsets[file]; ok { - t.Log.Debugf("Using offset %d for %q", offset, file) - seek = &tail.SeekInfo{ - Whence: 0, - Offset: offset, - } - } else { - seek = &tail.SeekInfo{ - Whence: 2, - Offset: 0, - } - } + seek, err := t.getSeekInfo(file) + if err != nil { + return err } tailer, err := tail.TailFile(file, diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index b35177cd1..a99efd556 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -11,6 +11,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "github.com/influxdata/tail" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" @@ -43,7 +44,6 @@ func newTestTail() *Tail { } return &Tail{ - FromBeginning: false, MaxUndeliveredLines: 1000, offsets: offsetsCopy, WatchMethod: watchMethod, @@ -64,7 +64,7 @@ cpu usage_idle=100 tt := newTestTail() tt.Log = logger - tt.FromBeginning = true + tt.InitialReadOffset = "beginning" tt.Files = []string{tmpfile} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) @@ -88,7 +88,7 @@ func TestColoredLine(t *testing.T) { tt := newTestTail() tt.Log = testutil.Logger{} - tt.FromBeginning = true + tt.InitialReadOffset = "beginning" tt.Filters = []string{"ansi_color"} tt.Files = []string{tmpfile} tt.SetParserFunc(newInfluxParser) @@ -118,7 +118,7 @@ func TestTailDosLineEndings(t *testing.T) { tt := newTestTail() tt.Log = testutil.Logger{} - tt.FromBeginning = true + tt.InitialReadOffset = "beginning" tt.Files = []string{tmpfile} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) @@ -146,7 +146,7 @@ func TestGrokParseLogFilesWithMultiline(t *testing.T) { duration := config.Duration(d) tt := newTail() tt.Log = testutil.Logger{} - tt.FromBeginning = true + tt.InitialReadOffset = "beginning" tt.Files = []string{filepath.Join("testdata", "test_multiline.log")} tt.MultilineConfig = multilineConfig{ Pattern: `^[^\[]`, @@ -210,7 +210,7 @@ func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) { tt := newTail() tt.Log = testutil.Logger{} - tt.FromBeginning = true + tt.InitialReadOffset = "beginning" tt.Files = []string{tmpfile.Name()} tt.MultilineConfig = multilineConfig{ Pattern: `^[^\[]`, @@ -261,7 +261,7 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test tt := newTestTail() tt.Log = testutil.Logger{} - tt.FromBeginning = true + tt.InitialReadOffset = "beginning" tt.Files = []string{filepath.Join("testdata", "test_multiline.log")} tt.MultilineConfig = multilineConfig{ Pattern: `^[^\[]`, @@ -314,7 +314,7 @@ cpu,42 plugin := newTestTail() plugin.Log = testutil.Logger{} - plugin.FromBeginning = true + plugin.InitialReadOffset = "beginning" plugin.Files = []string{tmpfile} plugin.SetParserFunc(func() (telegraf.Parser, error) { parser := csv.Parser{ @@ -388,7 +388,7 @@ skip2,mem,100 plugin := newTestTail() plugin.Log = testutil.Logger{} - plugin.FromBeginning = true + plugin.InitialReadOffset = "beginning" plugin.Files = []string{tmpfile} plugin.SetParserFunc(func() (telegraf.Parser, error) { parser := csv.Parser{ @@ -446,7 +446,7 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) { plugin := newTestTail() plugin.Log = testutil.Logger{} - plugin.FromBeginning = true + plugin.InitialReadOffset = "beginning" plugin.Files = []string{tmpfile} plugin.PathTag = "customPathTagMyFile" plugin.SetParserFunc(func() (telegraf.Parser, error) { @@ -526,7 +526,7 @@ func TestCharacterEncoding(t *testing.T) { tests := []struct { name string testfiles string - fromBeginning bool + initialReadOffset string characterEncoding string offset int64 expected []telegraf.Metric @@ -534,7 +534,7 @@ func TestCharacterEncoding(t *testing.T) { { name: "utf-8", testfiles: "cpu-utf-8.influx", - fromBeginning: true, + initialReadOffset: "beginning", characterEncoding: "utf-8", expected: full, }, @@ -548,7 +548,7 @@ func TestCharacterEncoding(t *testing.T) { { name: "utf-16le", testfiles: "cpu-utf-16le.influx", - fromBeginning: true, + initialReadOffset: "beginning", characterEncoding: "utf-16le", expected: full, }, @@ -562,7 +562,7 @@ func TestCharacterEncoding(t *testing.T) { { name: "utf-16be", testfiles: "cpu-utf-16be.influx", - fromBeginning: true, + initialReadOffset: "beginning", characterEncoding: "utf-16be", expected: full, }, @@ -571,7 +571,7 @@ func TestCharacterEncoding(t *testing.T) { t.Run(tt.name, func(t *testing.T) { plugin := &Tail{ Files: []string{filepath.Join("testdata", tt.testfiles)}, - FromBeginning: tt.fromBeginning, + InitialReadOffset: tt.initialReadOffset, MaxUndeliveredLines: 1000, Log: testutil.Logger{}, CharacterEncoding: tt.characterEncoding, @@ -615,7 +615,7 @@ func TestTailEOF(t *testing.T) { tt := newTestTail() tt.Log = testutil.Logger{} - tt.FromBeginning = true + tt.InitialReadOffset = "beginning" tt.Files = []string{tmpfile.Name()} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) @@ -667,7 +667,7 @@ func TestCSVBehavior(t *testing.T) { // Setup the plugin plugin := &Tail{ Files: []string{input.Name()}, - FromBeginning: true, + InitialReadOffset: "beginning", MaxUndeliveredLines: 1000, offsets: make(map[string]int64, 0), PathTag: "path", @@ -800,3 +800,166 @@ func TestStatePersistence(t *testing.T) { require.True(t, ok, "state is not a map[string]int64") require.Equal(t, expectedState, actualState) } + +func TestGetSeekInfo(t *testing.T) { + tests := []struct { + name string + offsets map[string]int64 + file string + InitialReadOffset string + expected *tail.SeekInfo + }{ + { + name: "Read from beginning when initial_read_offset set to beginning", + offsets: map[string]int64{"test.log": 100}, + file: "test.log", + InitialReadOffset: "beginning", + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 0, + }, + }, + { + name: "Read from end when initial_read_offset set to end", + offsets: map[string]int64{"test.log": 100}, + file: "test.log", + InitialReadOffset: "end", + expected: &tail.SeekInfo{ + Whence: 2, + Offset: 0, + }, + }, + { + name: "Read from end when offset not exists and initial_read_offset set to save-or-end", + offsets: map[string]int64{}, + file: "test.log", + InitialReadOffset: "save-or-end", + expected: &tail.SeekInfo{ + Whence: 2, + Offset: 0, + }, + }, + { + name: "Read from offset when offset exists and initial_read_offset set to save-or-end", + offsets: map[string]int64{"test.log": 100}, + file: "test.log", + InitialReadOffset: "save-or-end", + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 100, + }, + }, + { + name: "Read from start when offset not exists and initial_read_offset set to save-offset-or-start", + offsets: map[string]int64{}, + file: "test.log", + InitialReadOffset: "save-or-beginning", + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 0, + }, + }, + { + name: "Read from offset when offset exists and initial_read_offset set to save-or-end", + offsets: map[string]int64{"test.log": 100}, + file: "test.log", + InitialReadOffset: "save-or-beginning", + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 100, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + logger := &testutil.CaptureLogger{} + tt := newTail() + tt.Log = logger + tt.InitialReadOffset = test.InitialReadOffset + + require.NoError(t, tt.Init()) + tt.offsets = test.offsets + + seekInfo, err := tt.getSeekInfo(test.file) + require.NoError(t, err) + require.Equal(t, test.expected, seekInfo) + }) + } + + t.Run("Return error when initial_read_offset is invalid", func(t *testing.T) { + logger := &testutil.CaptureLogger{} + tt := newTail() + tt.Log = logger + tt.InitialReadOffset = "invalid" + + require.NoError(t, tt.Init()) + _, err := tt.getSeekInfo("test.log") + require.Error(t, err) + }) +} + +func TestSetInitialValueForInitialReadOffset(t *testing.T) { + tests := []struct { + name string + InitialReadOffset string + FromBeginning bool + expected string + }{ + { + name: "Set InitialReadOffset to beginning when from_beginning set to true and initial_read_offset not set", + FromBeginning: true, + expected: "beginning", + }, + { + name: "Set InitialReadOffset to save-or-end when from_beginning set to false and initial_read_offset not set", + expected: "save-or-end", + }, + { + name: "Ignore from_beginning when initial_read_offset is set", + InitialReadOffset: "end", + expected: "end", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tt := newTail() + tt.FromBeginning = test.FromBeginning + tt.InitialReadOffset = test.InitialReadOffset + require.NoError(t, tt.Init()) + require.Equal(t, test.expected, tt.InitialReadOffset) + }) + } +} + +func TestInitInitialReadOffset(t *testing.T) { + tests := []struct { + name string + InitialReadOffset string + FromBeginning bool + expected string + }{ + { + name: "Set InitialReadOffset to beginning when from_beginning set to true and initial_read_offset not set", + FromBeginning: true, + expected: "beginning", + }, + { + name: "Ignore from_beginning when initial_read_offset is set", + FromBeginning: true, + InitialReadOffset: "end", + expected: "end", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tt := newTail() + tt.FromBeginning = test.FromBeginning + tt.InitialReadOffset = test.InitialReadOffset + require.NoError(t, tt.Init()) + require.Equal(t, test.expected, tt.InitialReadOffset) + }) + } +}