feat(input.tail): Add `initial_read_offset` config for controlling read behavior (#16342)

This commit is contained in:
clas 2025-01-28 03:46:36 +08:00 committed by GitHub
parent 0e1e30f93a
commit 2f6d58b94a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 240 additions and 41 deletions

View File

@ -12,7 +12,7 @@ tail -F --lines=0 myfile.log
that it will be compatible with log-rotated files, and that it will retry on
inaccessible files.
- `--lines=0` means that it will start at the end of the file (unless
the `from_beginning` option is set).
the `initial_read_offset` option is set).
see <http://man7.org/linux/man-pages/man1/tail.1.html> for more details.
@ -56,8 +56,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
##
files = ["/var/mymetrics.out"]
## Read file from beginning.
# from_beginning = false
## Offset to start reading at
## The following methods are available:
## beginning -- start reading from the beginning of the file ignoring any persisted offset
## end -- start reading from the end of the file ignoring any persisted offset
## saved-or-beginning -- use the persisted offset of the file or, if no offset persisted, start from the beginning of the file
## saved-or-end -- use the persisted offset of the file or, if no offset persisted, start from the end of the file
# initial_read_offset = "saved-or-end"
## Whether file is a named pipe
# pipe = false

View File

@ -12,8 +12,13 @@
##
files = ["/var/mymetrics.out"]
## Read file from beginning.
# from_beginning = false
## Offset to start reading at
## The following methods are available:
## beginning -- start reading from the beginning of the file ignoring any persisted offset
## end -- start reading from the end of the file ignoring any persisted offset
## saved-or-beginning -- use the persisted offset of the file or, if no offset persisted, start from the beginning of the file
## saved-or-end -- use the persisted offset of the file or, if no offset persisted, start from the end of the file
# initial_read_offset = "saved-or-end"
## Whether file is a named pipe
# pipe = false

View File

@ -37,7 +37,8 @@ var (
type Tail struct {
Files []string `toml:"files"`
FromBeginning bool `toml:"from_beginning"`
FromBeginning bool `toml:"from_beginning" deprecated:"1.34.0;1.40.0;use 'initial_read_offset' with value 'beginning' instead"`
InitialReadOffset string `toml:"initial_read_offset"`
Pipe bool `toml:"pipe"`
WatchMethod string `toml:"watch_method"`
MaxUndeliveredLines int `toml:"max_undelivered_lines"`
@ -89,6 +90,14 @@ func (t *Tail) Init() error {
// init offsets
t.offsets = make(map[string]int64)
if t.InitialReadOffset == "" {
if t.FromBeginning {
t.InitialReadOffset = "beginning"
} else {
t.InitialReadOffset = "save-or-end"
}
}
var err error
t.decoder, err = encoding.NewDecoder(t.CharacterEncoding)
return err
@ -121,7 +130,10 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
t.tailers = make(map[string]*tail.Tail)
err = t.tailNewFiles(t.FromBeginning)
err = t.tailNewFiles()
if err != nil {
return err
}
// assumption that once Start is called, all parallel plugins have already been initialized
offsetsMutex.Lock()
@ -131,6 +143,31 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
return err
}
func (t *Tail) getSeekInfo(file string) (*tail.SeekInfo, error) {
switch t.InitialReadOffset {
case "beginning":
return &tail.SeekInfo{Whence: 0, Offset: 0}, nil
case "end":
return &tail.SeekInfo{Whence: 2, Offset: 0}, nil
case "", "save-or-end":
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
return &tail.SeekInfo{Whence: 0, Offset: offset}, nil
} else {
return &tail.SeekInfo{Whence: 2, Offset: 0}, nil
}
case "save-or-beginning":
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
return &tail.SeekInfo{Whence: 0, Offset: offset}, nil
} else {
return &tail.SeekInfo{Whence: 0, Offset: 0}, nil
}
default:
return nil, errors.New("invalid 'initial_read_offset' setting")
}
}
func (t *Tail) GetState() interface{} {
return t.offsets
}
@ -147,12 +184,12 @@ func (t *Tail) SetState(state interface{}) error {
}
func (t *Tail) Gather(_ telegraf.Accumulator) error {
return t.tailNewFiles(true)
return t.tailNewFiles()
}
func (t *Tail) Stop() {
for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
if !t.Pipe {
// store offset for resume
offset, err := tailer.Tell()
if err == nil {
@ -179,7 +216,7 @@ func (t *Tail) Stop() {
offsetsMutex.Unlock()
}
func (t *Tail) tailNewFiles(fromBeginning bool) error {
func (t *Tail) tailNewFiles() error {
var poll bool
if t.WatchMethod == "poll" {
poll = true
@ -197,20 +234,9 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
continue
}
var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
seek, err := t.getSeekInfo(file)
if err != nil {
return err
}
tailer, err := tail.TailFile(file,

View File

@ -11,6 +11,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/influxdata/tail"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
@ -43,7 +44,6 @@ func newTestTail() *Tail {
}
return &Tail{
FromBeginning: false,
MaxUndeliveredLines: 1000,
offsets: offsetsCopy,
WatchMethod: watchMethod,
@ -64,7 +64,7 @@ cpu usage_idle=100
tt := newTestTail()
tt.Log = logger
tt.FromBeginning = true
tt.InitialReadOffset = "beginning"
tt.Files = []string{tmpfile}
tt.SetParserFunc(newInfluxParser)
require.NoError(t, tt.Init())
@ -88,7 +88,7 @@ func TestColoredLine(t *testing.T) {
tt := newTestTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.InitialReadOffset = "beginning"
tt.Filters = []string{"ansi_color"}
tt.Files = []string{tmpfile}
tt.SetParserFunc(newInfluxParser)
@ -118,7 +118,7 @@ func TestTailDosLineEndings(t *testing.T) {
tt := newTestTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.InitialReadOffset = "beginning"
tt.Files = []string{tmpfile}
tt.SetParserFunc(newInfluxParser)
require.NoError(t, tt.Init())
@ -146,7 +146,7 @@ func TestGrokParseLogFilesWithMultiline(t *testing.T) {
duration := config.Duration(d)
tt := newTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.InitialReadOffset = "beginning"
tt.Files = []string{filepath.Join("testdata", "test_multiline.log")}
tt.MultilineConfig = multilineConfig{
Pattern: `^[^\[]`,
@ -210,7 +210,7 @@ func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) {
tt := newTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.InitialReadOffset = "beginning"
tt.Files = []string{tmpfile.Name()}
tt.MultilineConfig = multilineConfig{
Pattern: `^[^\[]`,
@ -261,7 +261,7 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test
tt := newTestTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.InitialReadOffset = "beginning"
tt.Files = []string{filepath.Join("testdata", "test_multiline.log")}
tt.MultilineConfig = multilineConfig{
Pattern: `^[^\[]`,
@ -314,7 +314,7 @@ cpu,42
plugin := newTestTail()
plugin.Log = testutil.Logger{}
plugin.FromBeginning = true
plugin.InitialReadOffset = "beginning"
plugin.Files = []string{tmpfile}
plugin.SetParserFunc(func() (telegraf.Parser, error) {
parser := csv.Parser{
@ -388,7 +388,7 @@ skip2,mem,100
plugin := newTestTail()
plugin.Log = testutil.Logger{}
plugin.FromBeginning = true
plugin.InitialReadOffset = "beginning"
plugin.Files = []string{tmpfile}
plugin.SetParserFunc(func() (telegraf.Parser, error) {
parser := csv.Parser{
@ -446,7 +446,7 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) {
plugin := newTestTail()
plugin.Log = testutil.Logger{}
plugin.FromBeginning = true
plugin.InitialReadOffset = "beginning"
plugin.Files = []string{tmpfile}
plugin.PathTag = "customPathTagMyFile"
plugin.SetParserFunc(func() (telegraf.Parser, error) {
@ -526,7 +526,7 @@ func TestCharacterEncoding(t *testing.T) {
tests := []struct {
name string
testfiles string
fromBeginning bool
initialReadOffset string
characterEncoding string
offset int64
expected []telegraf.Metric
@ -534,7 +534,7 @@ func TestCharacterEncoding(t *testing.T) {
{
name: "utf-8",
testfiles: "cpu-utf-8.influx",
fromBeginning: true,
initialReadOffset: "beginning",
characterEncoding: "utf-8",
expected: full,
},
@ -548,7 +548,7 @@ func TestCharacterEncoding(t *testing.T) {
{
name: "utf-16le",
testfiles: "cpu-utf-16le.influx",
fromBeginning: true,
initialReadOffset: "beginning",
characterEncoding: "utf-16le",
expected: full,
},
@ -562,7 +562,7 @@ func TestCharacterEncoding(t *testing.T) {
{
name: "utf-16be",
testfiles: "cpu-utf-16be.influx",
fromBeginning: true,
initialReadOffset: "beginning",
characterEncoding: "utf-16be",
expected: full,
},
@ -571,7 +571,7 @@ func TestCharacterEncoding(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
plugin := &Tail{
Files: []string{filepath.Join("testdata", tt.testfiles)},
FromBeginning: tt.fromBeginning,
InitialReadOffset: tt.initialReadOffset,
MaxUndeliveredLines: 1000,
Log: testutil.Logger{},
CharacterEncoding: tt.characterEncoding,
@ -615,7 +615,7 @@ func TestTailEOF(t *testing.T) {
tt := newTestTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.InitialReadOffset = "beginning"
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(newInfluxParser)
require.NoError(t, tt.Init())
@ -667,7 +667,7 @@ func TestCSVBehavior(t *testing.T) {
// Setup the plugin
plugin := &Tail{
Files: []string{input.Name()},
FromBeginning: true,
InitialReadOffset: "beginning",
MaxUndeliveredLines: 1000,
offsets: make(map[string]int64, 0),
PathTag: "path",
@ -800,3 +800,166 @@ func TestStatePersistence(t *testing.T) {
require.True(t, ok, "state is not a map[string]int64")
require.Equal(t, expectedState, actualState)
}
func TestGetSeekInfo(t *testing.T) {
tests := []struct {
name string
offsets map[string]int64
file string
InitialReadOffset string
expected *tail.SeekInfo
}{
{
name: "Read from beginning when initial_read_offset set to beginning",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
InitialReadOffset: "beginning",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 0,
},
},
{
name: "Read from end when initial_read_offset set to end",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
InitialReadOffset: "end",
expected: &tail.SeekInfo{
Whence: 2,
Offset: 0,
},
},
{
name: "Read from end when offset not exists and initial_read_offset set to save-or-end",
offsets: map[string]int64{},
file: "test.log",
InitialReadOffset: "save-or-end",
expected: &tail.SeekInfo{
Whence: 2,
Offset: 0,
},
},
{
name: "Read from offset when offset exists and initial_read_offset set to save-or-end",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
InitialReadOffset: "save-or-end",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 100,
},
},
{
name: "Read from start when offset not exists and initial_read_offset set to save-offset-or-start",
offsets: map[string]int64{},
file: "test.log",
InitialReadOffset: "save-or-beginning",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 0,
},
},
{
name: "Read from offset when offset exists and initial_read_offset set to save-or-end",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
InitialReadOffset: "save-or-beginning",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 100,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger := &testutil.CaptureLogger{}
tt := newTail()
tt.Log = logger
tt.InitialReadOffset = test.InitialReadOffset
require.NoError(t, tt.Init())
tt.offsets = test.offsets
seekInfo, err := tt.getSeekInfo(test.file)
require.NoError(t, err)
require.Equal(t, test.expected, seekInfo)
})
}
t.Run("Return error when initial_read_offset is invalid", func(t *testing.T) {
logger := &testutil.CaptureLogger{}
tt := newTail()
tt.Log = logger
tt.InitialReadOffset = "invalid"
require.NoError(t, tt.Init())
_, err := tt.getSeekInfo("test.log")
require.Error(t, err)
})
}
func TestSetInitialValueForInitialReadOffset(t *testing.T) {
tests := []struct {
name string
InitialReadOffset string
FromBeginning bool
expected string
}{
{
name: "Set InitialReadOffset to beginning when from_beginning set to true and initial_read_offset not set",
FromBeginning: true,
expected: "beginning",
},
{
name: "Set InitialReadOffset to save-or-end when from_beginning set to false and initial_read_offset not set",
expected: "save-or-end",
},
{
name: "Ignore from_beginning when initial_read_offset is set",
InitialReadOffset: "end",
expected: "end",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tt := newTail()
tt.FromBeginning = test.FromBeginning
tt.InitialReadOffset = test.InitialReadOffset
require.NoError(t, tt.Init())
require.Equal(t, test.expected, tt.InitialReadOffset)
})
}
}
func TestInitInitialReadOffset(t *testing.T) {
tests := []struct {
name string
InitialReadOffset string
FromBeginning bool
expected string
}{
{
name: "Set InitialReadOffset to beginning when from_beginning set to true and initial_read_offset not set",
FromBeginning: true,
expected: "beginning",
},
{
name: "Ignore from_beginning when initial_read_offset is set",
FromBeginning: true,
InitialReadOffset: "end",
expected: "end",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tt := newTail()
tt.FromBeginning = test.FromBeginning
tt.InitialReadOffset = test.InitialReadOffset
require.NoError(t, tt.Init())
require.Equal(t, test.expected, tt.InitialReadOffset)
})
}
}