diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index d4c18afd4..46b268f78 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -8,6 +8,7 @@ import ( "context" _ "embed" "errors" + "fmt" "io" "strings" "sync" @@ -77,6 +78,24 @@ func (t *Tail) SetParserFunc(fn telegraf.ParserFunc) { } func (t *Tail) Init() error { + // Backward compatibility setting + if t.InitialReadOffset == "" { + if t.FromBeginning { + t.InitialReadOffset = "beginning" + } else { + t.InitialReadOffset = "saved-or-end" + } + } + + // Check settings + switch t.InitialReadOffset { + case "": + t.InitialReadOffset = "saved-or-end" + case "beginning", "end", "saved-or-end", "saved-or-beginning": + default: + return fmt.Errorf("invalid 'initial_read_offset' setting %q", t.InitialReadOffset) + } + if t.MaxUndeliveredLines == 0 { return errors.New("max_undelivered_lines must be positive") } @@ -87,20 +106,17 @@ func (t *Tail) Init() error { t.filterColors = true } } + // init offsets t.offsets = make(map[string]int64) - if t.InitialReadOffset == "" { - if t.FromBeginning { - t.InitialReadOffset = "beginning" - } else { - t.InitialReadOffset = "saved-or-end" - } + dec, err := encoding.NewDecoder(t.CharacterEncoding) + if err != nil { + return fmt.Errorf("creating decoder failed: %w", err) } + t.decoder = dec - var err error - t.decoder, err = encoding.NewDecoder(t.CharacterEncoding) - return err + return nil } func (t *Tail) Start(acc telegraf.Accumulator) error { @@ -144,6 +160,12 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { } func (t *Tail) getSeekInfo(file string) (*tail.SeekInfo, error) { + // Pipes do not support seeking + if t.Pipe { + return nil, nil + } + + // Determine the actual position for continuing switch t.InitialReadOffset { case "beginning": return &tail.SeekInfo{Whence: 0, Offset: 0}, nil @@ -426,7 +448,6 @@ func newTail() *Tail { offsetsMutex.Unlock() return &Tail{ - FromBeginning: false, MaxUndeliveredLines: 1000, offsets: offsetsCopy, PathTag: "path", diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 8c922228f..8614b5372 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -800,67 +800,74 @@ func TestStatePersistence(t *testing.T) { func TestGetSeekInfo(t *testing.T) { tests := []struct { - name string - offsets map[string]int64 - file string - InitialReadOffset string - expected *tail.SeekInfo + name string + offsets map[string]int64 + initial string + expected *tail.SeekInfo }{ { - name: "Read from beginning when initial_read_offset set to beginning", - offsets: map[string]int64{"test.log": 100}, - file: "test.log", - InitialReadOffset: "beginning", + name: "beginning without offset", + initial: "beginning", expected: &tail.SeekInfo{ Whence: 0, Offset: 0, }, }, { - name: "Read from end when initial_read_offset set to end", - offsets: map[string]int64{"test.log": 100}, - file: "test.log", - InitialReadOffset: "end", + name: "beginning with offset", + offsets: map[string]int64{"test.log": 100}, + initial: "beginning", + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 0, + }, + }, + { + name: "end without offset", + initial: "end", expected: &tail.SeekInfo{ Whence: 2, Offset: 0, }, }, { - name: "Read from end when offset not exists and initial_read_offset set to saved-or-end", - offsets: map[string]int64{}, - file: "test.log", - InitialReadOffset: "saved-or-end", + name: "end with offset", + offsets: map[string]int64{"test.log": 100}, + initial: "end", expected: &tail.SeekInfo{ Whence: 2, Offset: 0, }, }, { - name: "Read from offset when offset exists and initial_read_offset set to saved-or-end", - offsets: map[string]int64{"test.log": 100}, - file: "test.log", - InitialReadOffset: "saved-or-end", + name: "saved-or-beginning without offset", + initial: "saved-or-beginning", + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 0, + }, + }, + { + name: "saved-or-beginning with offset", + offsets: map[string]int64{"test.log": 100}, + initial: "saved-or-beginning", expected: &tail.SeekInfo{ Whence: 0, Offset: 100, }, }, { - name: "Read from start when offset not exists and initial_read_offset set to save-offset-or-start", - offsets: map[string]int64{}, - file: "test.log", - InitialReadOffset: "saved-or-beginning", + name: "saved-or-end without offset", + initial: "saved-or-end", expected: &tail.SeekInfo{ - Whence: 0, + Whence: 2, Offset: 0, }, }, { - name: "Read from offset when offset exists and initial_read_offset set to saved-or-end", - offsets: map[string]int64{"test.log": 100}, - file: "test.log", - InitialReadOffset: "saved-or-beginning", + name: "saved-or-end with offset", + offsets: map[string]int64{"test.log": 100}, + initial: "saved-or-end", expected: &tail.SeekInfo{ Whence: 0, Offset: 100, @@ -868,32 +875,95 @@ func TestGetSeekInfo(t *testing.T) { }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - logger := &testutil.CaptureLogger{} - tt := newTail() - tt.Log = logger - tt.InitialReadOffset = test.InitialReadOffset + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &Tail{ + MaxUndeliveredLines: 1000, + InitialReadOffset: tt.initial, + PathTag: "path", + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.offsets = tt.offsets - require.NoError(t, tt.Init()) - tt.offsets = test.offsets - - seekInfo, err := tt.getSeekInfo(test.file) + seekInfo, err := plugin.getSeekInfo("test.log") require.NoError(t, err) - require.Equal(t, test.expected, seekInfo) + require.Equal(t, tt.expected, seekInfo) }) } +} - t.Run("Return error when initial_read_offset is invalid", func(t *testing.T) { - logger := &testutil.CaptureLogger{} - tt := newTail() - tt.Log = logger - tt.InitialReadOffset = "invalid" +func TestGetSeekInfoForPipes(t *testing.T) { + tests := []struct { + name string + offsets map[string]int64 + initial string + }{ + { + name: "beginning without offset", + initial: "beginning", + }, + { + name: "beginning with offset", + offsets: map[string]int64{"test.log": 100}, + initial: "beginning", + }, + { + name: "end without offset", + initial: "end", + }, + { + name: "end with offset", + offsets: map[string]int64{"test.log": 100}, + initial: "end", + }, + { + name: "saved-or-end without offset", + initial: "saved-or-end", + }, + { + name: "saved-or-end with offset", + offsets: map[string]int64{"test.log": 100}, + initial: "saved-or-end", + }, + { + name: "saved-or-beginning without offset", + initial: "saved-or-beginning", + }, + { + name: "saved-or-beginning with offset", + initial: "saved-or-beginning", + offsets: map[string]int64{"test.log": 100}, + }, + } - require.NoError(t, tt.Init()) - _, err := tt.getSeekInfo("test.log") - require.Error(t, err) - }) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &Tail{ + InitialReadOffset: tt.initial, + MaxUndeliveredLines: 1000, + PathTag: "path", + Pipe: true, + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.offsets = tt.offsets + + seekInfo, err := plugin.getSeekInfo("test.log") + require.NoError(t, err) + require.Nil(t, seekInfo) + }) + } +} + +func TestInvalidInitialReadOffset(t *testing.T) { + plugin := &Tail{ + InitialReadOffset: "invalid", + MaxUndeliveredLines: 1000, + PathTag: "path", + Log: &testutil.Logger{}, + } + require.ErrorContains(t, plugin.Init(), "invalid 'initial_read_offset' setting") } func TestSetInitialValueForInitialReadOffset(t *testing.T) {