fix(inputs.tail): Do not seek on pipes (#16674)

This commit is contained in:
Sven Rebhan 2025-03-24 16:04:49 +01:00 committed by GitHub
parent 55f471aa32
commit 15b92f38c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 151 additions and 60 deletions

View File

@ -8,6 +8,7 @@ import (
"context" "context"
_ "embed" _ "embed"
"errors" "errors"
"fmt"
"io" "io"
"strings" "strings"
"sync" "sync"
@ -77,6 +78,24 @@ func (t *Tail) SetParserFunc(fn telegraf.ParserFunc) {
} }
func (t *Tail) Init() error { func (t *Tail) Init() error {
// Backward compatibility setting
if t.InitialReadOffset == "" {
if t.FromBeginning {
t.InitialReadOffset = "beginning"
} else {
t.InitialReadOffset = "saved-or-end"
}
}
// Check settings
switch t.InitialReadOffset {
case "":
t.InitialReadOffset = "saved-or-end"
case "beginning", "end", "saved-or-end", "saved-or-beginning":
default:
return fmt.Errorf("invalid 'initial_read_offset' setting %q", t.InitialReadOffset)
}
if t.MaxUndeliveredLines == 0 { if t.MaxUndeliveredLines == 0 {
return errors.New("max_undelivered_lines must be positive") return errors.New("max_undelivered_lines must be positive")
} }
@ -87,20 +106,17 @@ func (t *Tail) Init() error {
t.filterColors = true t.filterColors = true
} }
} }
// init offsets // init offsets
t.offsets = make(map[string]int64) t.offsets = make(map[string]int64)
if t.InitialReadOffset == "" { dec, err := encoding.NewDecoder(t.CharacterEncoding)
if t.FromBeginning { if err != nil {
t.InitialReadOffset = "beginning" return fmt.Errorf("creating decoder failed: %w", err)
} else {
t.InitialReadOffset = "saved-or-end"
}
} }
t.decoder = dec
var err error return nil
t.decoder, err = encoding.NewDecoder(t.CharacterEncoding)
return err
} }
func (t *Tail) Start(acc telegraf.Accumulator) error { func (t *Tail) Start(acc telegraf.Accumulator) error {
@ -144,6 +160,12 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
} }
func (t *Tail) getSeekInfo(file string) (*tail.SeekInfo, error) { func (t *Tail) getSeekInfo(file string) (*tail.SeekInfo, error) {
// Pipes do not support seeking
if t.Pipe {
return nil, nil
}
// Determine the actual position for continuing
switch t.InitialReadOffset { switch t.InitialReadOffset {
case "beginning": case "beginning":
return &tail.SeekInfo{Whence: 0, Offset: 0}, nil return &tail.SeekInfo{Whence: 0, Offset: 0}, nil
@ -426,7 +448,6 @@ func newTail() *Tail {
offsetsMutex.Unlock() offsetsMutex.Unlock()
return &Tail{ return &Tail{
FromBeginning: false,
MaxUndeliveredLines: 1000, MaxUndeliveredLines: 1000,
offsets: offsetsCopy, offsets: offsetsCopy,
PathTag: "path", PathTag: "path",

View File

@ -800,67 +800,74 @@ func TestStatePersistence(t *testing.T) {
func TestGetSeekInfo(t *testing.T) { func TestGetSeekInfo(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
offsets map[string]int64 offsets map[string]int64
file string initial string
InitialReadOffset string expected *tail.SeekInfo
expected *tail.SeekInfo
}{ }{
{ {
name: "Read from beginning when initial_read_offset set to beginning", name: "beginning without offset",
offsets: map[string]int64{"test.log": 100}, initial: "beginning",
file: "test.log",
InitialReadOffset: "beginning",
expected: &tail.SeekInfo{ expected: &tail.SeekInfo{
Whence: 0, Whence: 0,
Offset: 0, Offset: 0,
}, },
}, },
{ {
name: "Read from end when initial_read_offset set to end", name: "beginning with offset",
offsets: map[string]int64{"test.log": 100}, offsets: map[string]int64{"test.log": 100},
file: "test.log", initial: "beginning",
InitialReadOffset: "end", expected: &tail.SeekInfo{
Whence: 0,
Offset: 0,
},
},
{
name: "end without offset",
initial: "end",
expected: &tail.SeekInfo{ expected: &tail.SeekInfo{
Whence: 2, Whence: 2,
Offset: 0, Offset: 0,
}, },
}, },
{ {
name: "Read from end when offset not exists and initial_read_offset set to saved-or-end", name: "end with offset",
offsets: map[string]int64{}, offsets: map[string]int64{"test.log": 100},
file: "test.log", initial: "end",
InitialReadOffset: "saved-or-end",
expected: &tail.SeekInfo{ expected: &tail.SeekInfo{
Whence: 2, Whence: 2,
Offset: 0, Offset: 0,
}, },
}, },
{ {
name: "Read from offset when offset exists and initial_read_offset set to saved-or-end", name: "saved-or-beginning without offset",
offsets: map[string]int64{"test.log": 100}, initial: "saved-or-beginning",
file: "test.log", expected: &tail.SeekInfo{
InitialReadOffset: "saved-or-end", Whence: 0,
Offset: 0,
},
},
{
name: "saved-or-beginning with offset",
offsets: map[string]int64{"test.log": 100},
initial: "saved-or-beginning",
expected: &tail.SeekInfo{ expected: &tail.SeekInfo{
Whence: 0, Whence: 0,
Offset: 100, Offset: 100,
}, },
}, },
{ {
name: "Read from start when offset not exists and initial_read_offset set to save-offset-or-start", name: "saved-or-end without offset",
offsets: map[string]int64{}, initial: "saved-or-end",
file: "test.log",
InitialReadOffset: "saved-or-beginning",
expected: &tail.SeekInfo{ expected: &tail.SeekInfo{
Whence: 0, Whence: 2,
Offset: 0, Offset: 0,
}, },
}, },
{ {
name: "Read from offset when offset exists and initial_read_offset set to saved-or-end", name: "saved-or-end with offset",
offsets: map[string]int64{"test.log": 100}, offsets: map[string]int64{"test.log": 100},
file: "test.log", initial: "saved-or-end",
InitialReadOffset: "saved-or-beginning",
expected: &tail.SeekInfo{ expected: &tail.SeekInfo{
Whence: 0, Whence: 0,
Offset: 100, Offset: 100,
@ -868,32 +875,95 @@ func TestGetSeekInfo(t *testing.T) {
}, },
} }
for _, test := range tests { for _, tt := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
logger := &testutil.CaptureLogger{} plugin := &Tail{
tt := newTail() MaxUndeliveredLines: 1000,
tt.Log = logger InitialReadOffset: tt.initial,
tt.InitialReadOffset = test.InitialReadOffset PathTag: "path",
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())
plugin.offsets = tt.offsets
require.NoError(t, tt.Init()) seekInfo, err := plugin.getSeekInfo("test.log")
tt.offsets = test.offsets
seekInfo, err := tt.getSeekInfo(test.file)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, test.expected, seekInfo) require.Equal(t, tt.expected, seekInfo)
}) })
} }
}
t.Run("Return error when initial_read_offset is invalid", func(t *testing.T) { func TestGetSeekInfoForPipes(t *testing.T) {
logger := &testutil.CaptureLogger{} tests := []struct {
tt := newTail() name string
tt.Log = logger offsets map[string]int64
tt.InitialReadOffset = "invalid" initial string
}{
{
name: "beginning without offset",
initial: "beginning",
},
{
name: "beginning with offset",
offsets: map[string]int64{"test.log": 100},
initial: "beginning",
},
{
name: "end without offset",
initial: "end",
},
{
name: "end with offset",
offsets: map[string]int64{"test.log": 100},
initial: "end",
},
{
name: "saved-or-end without offset",
initial: "saved-or-end",
},
{
name: "saved-or-end with offset",
offsets: map[string]int64{"test.log": 100},
initial: "saved-or-end",
},
{
name: "saved-or-beginning without offset",
initial: "saved-or-beginning",
},
{
name: "saved-or-beginning with offset",
initial: "saved-or-beginning",
offsets: map[string]int64{"test.log": 100},
},
}
require.NoError(t, tt.Init()) for _, tt := range tests {
_, err := tt.getSeekInfo("test.log") t.Run(tt.name, func(t *testing.T) {
require.Error(t, err) plugin := &Tail{
}) InitialReadOffset: tt.initial,
MaxUndeliveredLines: 1000,
PathTag: "path",
Pipe: true,
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())
plugin.offsets = tt.offsets
seekInfo, err := plugin.getSeekInfo("test.log")
require.NoError(t, err)
require.Nil(t, seekInfo)
})
}
}
func TestInvalidInitialReadOffset(t *testing.T) {
plugin := &Tail{
InitialReadOffset: "invalid",
MaxUndeliveredLines: 1000,
PathTag: "path",
Log: &testutil.Logger{},
}
require.ErrorContains(t, plugin.Init(), "invalid 'initial_read_offset' setting")
} }
func TestSetInitialValueForInitialReadOffset(t *testing.T) { func TestSetInitialValueForInitialReadOffset(t *testing.T) {