feat(inputs.docker_log): Add state-persistence capabilities (#12775)

This commit is contained in:
Sven Rebhan 2023-03-06 12:33:23 +01:00 committed by GitHub
parent 360edd52b6
commit 119a95dc72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 20 deletions

View File

@ -31,8 +31,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
# endpoint = "unix:///var/run/docker.sock"
## When true, container logs are read from the beginning; otherwise
## reading begins at the end of the log.
## When true, container logs are read from the beginning; otherwise reading
## begins at the end of the log. If state-persistence is enabled for Telegraf,
## the reading continues at the last previously processed timestamp.
# from_beginning = false
## Timeout for Docker API calls.

View File

@ -65,6 +65,11 @@ type DockerLogs struct {
wg sync.WaitGroup
mu sync.Mutex
containerList map[string]context.CancelFunc
// State of the plugin mapping container-ID to the timestamp of the
// last record processed
lastRecord map[string]time.Time
lastRecordMtx sync.Mutex
}
func (*DockerLogs) SampleConfig() string {
@ -116,6 +121,34 @@ func (d *DockerLogs) Init() error {
}
}
d.lastRecord = make(map[string]time.Time)
return nil
}
// State persistence interfaces
func (d *DockerLogs) GetState() interface{} {
d.lastRecordMtx.Lock()
recordOffsets := make(map[string]time.Time, len(d.lastRecord))
for k, v := range d.lastRecord {
recordOffsets[k] = v
}
d.lastRecordMtx.Unlock()
return recordOffsets
}
func (d *DockerLogs) SetState(state interface{}) error {
recordOffsets, ok := state.(map[string]time.Time)
if !ok {
return fmt.Errorf("state has wrong type %T", state)
}
d.lastRecordMtx.Lock()
for k, v := range recordOffsets {
d.lastRecord[k] = v
}
d.lastRecordMtx.Unlock()
return nil
}
@ -237,9 +270,13 @@ func (d *DockerLogs) tailContainerLogs(
return err
}
tail := "0"
if d.FromBeginning {
tail = "all"
since := time.Time{}.Format(time.RFC3339Nano)
if !d.FromBeginning {
d.lastRecordMtx.Lock()
if ts, ok := d.lastRecord[container.ID]; ok {
since = ts.Format(time.RFC3339Nano)
}
d.lastRecordMtx.Unlock()
}
logOptions := types.ContainerLogsOptions{
@ -248,7 +285,7 @@ func (d *DockerLogs) tailContainerLogs(
Timestamps: true,
Details: false,
Follow: true,
Tail: tail,
Since: since,
}
logReader, err := d.client.ContainerLogs(ctx, container.ID, logOptions)
@ -262,10 +299,23 @@ func (d *DockerLogs) tailContainerLogs(
//
// If the container is *not* using a TTY, streams for stdout and stderr are
// multiplexed.
var last time.Time
if hasTTY {
return tailStream(acc, tags, container.ID, logReader, "tty")
last, err = tailStream(acc, tags, container.ID, logReader, "tty")
} else {
last, err = tailMultiplexed(acc, tags, container.ID, logReader)
}
return tailMultiplexed(acc, tags, container.ID, logReader)
if err != nil {
return err
}
if ts, ok := d.lastRecord[container.ID]; !ok || ts.Before(last) {
d.lastRecordMtx.Lock()
d.lastRecord[container.ID] = last
d.lastRecordMtx.Unlock()
}
return nil
}
func parseLine(line []byte) (time.Time, string, error) {
@ -297,7 +347,7 @@ func tailStream(
containerID string,
reader io.ReadCloser,
stream string,
) error {
) (time.Time, error) {
defer reader.Close()
tags := make(map[string]string, len(baseTags)+1)
@ -308,6 +358,7 @@ func tailStream(
r := bufio.NewReaderSize(reader, 64*1024)
var lastTs time.Time
for {
line, err := r.ReadBytes('\n')
@ -321,13 +372,18 @@ func tailStream(
"message": message,
}, tags, ts)
}
// Store the last processed timestamp
if ts.After(lastTs) {
lastTs = ts
}
}
if err != nil {
if err == io.EOF {
return nil
return lastTs, nil
}
return err
return time.Time{}, err
}
}
}
@ -337,15 +393,17 @@ func tailMultiplexed(
tags map[string]string,
containerID string,
src io.ReadCloser,
) error {
) (time.Time, error) {
outReader, outWriter := io.Pipe()
errReader, errWriter := io.Pipe()
var tsStdout, tsStderr time.Time
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := tailStream(acc, tags, containerID, outReader, "stdout")
var err error
tsStdout, err = tailStream(acc, tags, containerID, outReader, "stdout")
if err != nil {
acc.AddError(err)
}
@ -354,18 +412,28 @@ func tailMultiplexed(
wg.Add(1)
go func() {
defer wg.Done()
err := tailStream(acc, tags, containerID, errReader, "stderr")
var err error
tsStderr, err = tailStream(acc, tags, containerID, errReader, "stderr")
if err != nil {
acc.AddError(err)
}
}()
_, err := stdcopy.StdCopy(outWriter, errWriter, src)
outWriter.Close() //nolint:revive // we cannot do anything if the closing fails
errWriter.Close() //nolint:revive // we cannot do anything if the closing fails
src.Close() //nolint:revive // we cannot do anything if the closing fails
// Ignore the returned errors as we cannot do anything if the closing fails
_ = outWriter.Close()
_ = errWriter.Close()
_ = src.Close()
wg.Wait()
return err
if err != nil {
return time.Time{}, err
}
if tsStdout.After(tsStderr) {
return tsStdout, nil
}
return tsStderr, nil
}
// Start is a noop which is required for a *DockerLogs to implement

View File

@ -5,8 +5,9 @@
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
# endpoint = "unix:///var/run/docker.sock"
## When true, container logs are read from the beginning; otherwise
## reading begins at the end of the log.
## When true, container logs are read from the beginning; otherwise reading
## begins at the end of the log. If state-persistence is enabled for Telegraf,
## the reading continues at the last previously processed timestamp.
# from_beginning = false
## Timeout for Docker API calls.