chore: resolve linter issues for unhandled-error (#11969)
This commit is contained in:
parent
c044088313
commit
5878278fca
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/internal/goplugin"
|
"github.com/influxdata/telegraf/internal/goplugin"
|
||||||
|
|
@ -19,7 +21,6 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/all"
|
_ "github.com/influxdata/telegraf/plugins/outputs/all"
|
||||||
_ "github.com/influxdata/telegraf/plugins/parsers/all"
|
_ "github.com/influxdata/telegraf/plugins/parsers/all"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/all"
|
_ "github.com/influxdata/telegraf/plugins/processors/all"
|
||||||
"github.com/urfave/cli/v2"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type TelegrafConfig interface {
|
type TelegrafConfig interface {
|
||||||
|
|
@ -118,14 +119,17 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
|
||||||
|
|
||||||
// This function is used when Telegraf is run with only flags
|
// This function is used when Telegraf is run with only flags
|
||||||
action := func(cCtx *cli.Context) error {
|
action := func(cCtx *cli.Context) error {
|
||||||
logger.SetupLogging(logger.LogConfig{})
|
err := logger.SetupLogging(logger.LogConfig{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Deprecated: Use execd instead
|
// Deprecated: Use execd instead
|
||||||
// Load external plugins, if requested.
|
// Load external plugins, if requested.
|
||||||
if cCtx.String("plugin-directory") != "" {
|
if cCtx.String("plugin-directory") != "" {
|
||||||
log.Printf("I! Loading external plugins from: %s", cCtx.String("plugin-directory"))
|
log.Printf("I! Loading external plugins from: %s", cCtx.String("plugin-directory"))
|
||||||
if err := goplugin.LoadExternalPlugins(cCtx.String("plugin-directory")); err != nil {
|
if err := goplugin.LoadExternalPlugins(cCtx.String("plugin-directory")); err != nil {
|
||||||
return fmt.Errorf("E! %w", err)
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,7 +179,7 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
|
||||||
err := PrintInputConfig(cCtx.String("usage"), outputBuffer)
|
err := PrintInputConfig(cCtx.String("usage"), outputBuffer)
|
||||||
err2 := PrintOutputConfig(cCtx.String("usage"), outputBuffer)
|
err2 := PrintOutputConfig(cCtx.String("usage"), outputBuffer)
|
||||||
if err != nil && err2 != nil {
|
if err != nil && err2 != nil {
|
||||||
return fmt.Errorf("E! %s and %s", err, err2)
|
return fmt.Errorf("%s and %s", err, err2)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
// DEPRECATED
|
// DEPRECATED
|
||||||
|
|
@ -311,7 +315,7 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
|
||||||
Name: "sample-config",
|
Name: "sample-config",
|
||||||
Usage: "DEPRECATED: print out full sample configuration",
|
Usage: "DEPRECATED: print out full sample configuration",
|
||||||
},
|
},
|
||||||
// Using execd plugin to add external plugins is preffered (less size impact, easier for end user)
|
// Using execd plugin to add external plugins is preferred (less size impact, easier for end user)
|
||||||
&cli.StringFlag{
|
&cli.StringFlag{
|
||||||
Name: "plugin-directory",
|
Name: "plugin-directory",
|
||||||
Usage: "DEPRECATED: path to directory containing external plugins",
|
Usage: "DEPRECATED: path to directory containing external plugins",
|
||||||
|
|
|
||||||
|
|
@ -9,12 +9,13 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type MockTelegraf struct {
|
type MockTelegraf struct {
|
||||||
|
|
@ -80,7 +81,7 @@ func TestUsageFlag(t *testing.T) {
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
PluginName: "example",
|
PluginName: "example",
|
||||||
ExpectedError: "E! input example not found and output example not found",
|
ExpectedError: "input example not found and output example not found",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
PluginName: "temp",
|
PluginName: "temp",
|
||||||
|
|
@ -187,7 +188,7 @@ func TestPluginDirectoryFlag(t *testing.T) {
|
||||||
args := os.Args[0:1]
|
args := os.Args[0:1]
|
||||||
args = append(args, "--plugin-directory", ".")
|
args = append(args, "--plugin-directory", ".")
|
||||||
err := runApp(args, buf, NewMockServer(), NewMockConfig(buf), NewMockTelegraf())
|
err := runApp(args, buf, NewMockServer(), NewMockConfig(buf), NewMockTelegraf())
|
||||||
require.ErrorContains(t, err, "E! go plugin support is not enabled")
|
require.ErrorContains(t, err, "go plugin support is not enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCommandConfig(t *testing.T) {
|
func TestCommandConfig(t *testing.T) {
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@ import (
|
||||||
"github.com/coreos/go-systemd/daemon"
|
"github.com/coreos/go-systemd/daemon"
|
||||||
"github.com/fatih/color"
|
"github.com/fatih/color"
|
||||||
"github.com/influxdata/tail/watch"
|
"github.com/influxdata/tail/watch"
|
||||||
|
"gopkg.in/tomb.v1"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/agent"
|
"github.com/influxdata/telegraf/agent"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
|
@ -24,7 +26,6 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/plugins/processors"
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
"gopkg.in/tomb.v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var stop chan struct{}
|
var stop chan struct{}
|
||||||
|
|
@ -187,18 +188,18 @@ func (t *Telegraf) runAgent(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !(t.test || t.testWait != 0) && len(c.Outputs) == 0 {
|
if !(t.test || t.testWait != 0) && len(c.Outputs) == 0 {
|
||||||
return errors.New("Error: no outputs found, did you provide a valid config file?")
|
return errors.New("no outputs found, did you provide a valid config file?")
|
||||||
}
|
}
|
||||||
if t.plugindDir == "" && len(c.Inputs) == 0 {
|
if t.plugindDir == "" && len(c.Inputs) == 0 {
|
||||||
return errors.New("Error: no inputs found, did you provide a valid config file?")
|
return errors.New("no inputs found, did you provide a valid config file?")
|
||||||
}
|
}
|
||||||
|
|
||||||
if int64(c.Agent.Interval) <= 0 {
|
if int64(c.Agent.Interval) <= 0 {
|
||||||
return fmt.Errorf("Agent interval must be positive, found %v", c.Agent.Interval)
|
return fmt.Errorf("agent interval must be positive, found %v", c.Agent.Interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
if int64(c.Agent.FlushInterval) <= 0 {
|
if int64(c.Agent.FlushInterval) <= 0 {
|
||||||
return fmt.Errorf("Agent flush_interval must be positive; found %v", c.Agent.Interval)
|
return fmt.Errorf("agent flush_interval must be positive; found %v", c.Agent.Interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup logging as configured.
|
// Setup logging as configured.
|
||||||
|
|
@ -214,7 +215,10 @@ func (t *Telegraf) runAgent(ctx context.Context) error {
|
||||||
LogWithTimezone: c.Agent.LogWithTimezone,
|
LogWithTimezone: c.Agent.LogWithTimezone,
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.SetupLogging(logConfig)
|
err = logger.SetupLogging(logConfig)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
log.Printf("I! Starting Telegraf %s%s", internal.Version, internal.Customized)
|
log.Printf("I! Starting Telegraf %s%s", internal.Version, internal.Customized)
|
||||||
log.Printf("I! Available plugins: %d inputs, %d aggregators, %d processors, %d parsers, %d outputs",
|
log.Printf("I! Available plugins: %d inputs, %d aggregators, %d processors, %d parsers, %d outputs",
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/logger"
|
|
||||||
"github.com/kardianos/service"
|
"github.com/kardianos/service"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
func cliFlags() []cli.Flag {
|
func cliFlags() []cli.Flag {
|
||||||
|
|
@ -107,7 +108,7 @@ func (t *Telegraf) runAsWindowsService() error {
|
||||||
}
|
}
|
||||||
s, err := service.New(prg, svcConfig)
|
s, err := service.New(prg, svcConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("E! " + err.Error())
|
return err
|
||||||
}
|
}
|
||||||
// Handle the --service flag here to prevent any issues with tooling that
|
// Handle the --service flag here to prevent any issues with tooling that
|
||||||
// may not have an interactive session, e.g. installing from Ansible.
|
// may not have an interactive session, e.g. installing from Ansible.
|
||||||
|
|
@ -132,13 +133,17 @@ func (t *Telegraf) runAsWindowsService() error {
|
||||||
|
|
||||||
err := service.Control(s, t.service)
|
err := service.Control(s, t.service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("E! " + err.Error())
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.SetupLogging(logger.LogConfig{LogTarget: logger.LogTargetEventlog})
|
err = logger.SetupLogging(logger.LogConfig{LogTarget: logger.LogTargetEventlog})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
err = s.Run()
|
err = s.Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("E! " + err.Error())
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -77,11 +77,13 @@ func (p *Process) Start() error {
|
||||||
// Stop is called when the process isn't needed anymore
|
// Stop is called when the process isn't needed anymore
|
||||||
func (p *Process) Stop() {
|
func (p *Process) Stop() {
|
||||||
if p.cancel != nil {
|
if p.cancel != nil {
|
||||||
// signal our intent to shutdown and not restart the process
|
// signal our intent to shut down and not restart the process
|
||||||
p.cancel()
|
p.cancel()
|
||||||
}
|
}
|
||||||
// close stdin so the app can shut down gracefully.
|
// close stdin so the app can shut down gracefully.
|
||||||
p.Stdin.Close()
|
if err := p.Stdin.Close(); err != nil {
|
||||||
|
p.Log.Errorf("Stdin closed with message: %v", err)
|
||||||
|
}
|
||||||
p.mainLoopWg.Wait()
|
p.mainLoopWg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -176,7 +178,7 @@ func (p *Process) cmdWait(ctx context.Context) error {
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
gracefulStop(processCtx, p.Cmd, 5*time.Second)
|
p.gracefulStop(processCtx, p.Cmd, 5*time.Second)
|
||||||
case <-processCtx.Done():
|
case <-processCtx.Done():
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
|
|
||||||
|
|
@ -9,15 +9,19 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) {
|
func (p *Process) gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) {
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
cmd.Process.Signal(syscall.SIGTERM)
|
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||||
|
p.Log.Errorf("Error after sending SIGTERM signal to process: %v", err)
|
||||||
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
cmd.Process.Kill()
|
if err := cmd.Process.Kill(); err != nil {
|
||||||
|
p.Log.Errorf("Error after killing process: %v", err)
|
||||||
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,10 +8,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) {
|
func (p *Process) gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) {
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
cmd.Process.Kill()
|
if err := cmd.Process.Kill(); err != nil {
|
||||||
|
p.Log.Errorf("Error after killing process: %v", err)
|
||||||
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,37 +1,44 @@
|
||||||
package syslog
|
package syslog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFraming(t *testing.T) {
|
func TestFraming(t *testing.T) {
|
||||||
var f1 Framing
|
var f1 Framing
|
||||||
f1.UnmarshalTOML([]byte(`"non-transparent"`))
|
err := f1.UnmarshalTOML([]byte(`"non-transparent"`))
|
||||||
assert.Equal(t, NonTransparent, f1)
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, NonTransparent, f1)
|
||||||
|
|
||||||
var f2 Framing
|
var f2 Framing
|
||||||
f2.UnmarshalTOML([]byte(`non-transparent`))
|
err = f2.UnmarshalTOML([]byte(`non-transparent`))
|
||||||
assert.Equal(t, NonTransparent, f2)
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, NonTransparent, f2)
|
||||||
|
|
||||||
var f3 Framing
|
var f3 Framing
|
||||||
f3.UnmarshalTOML([]byte(`'non-transparent'`))
|
err = f3.UnmarshalTOML([]byte(`'non-transparent'`))
|
||||||
assert.Equal(t, NonTransparent, f3)
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, NonTransparent, f3)
|
||||||
|
|
||||||
var f4 Framing
|
var f4 Framing
|
||||||
f4.UnmarshalTOML([]byte(`"octet-counting"`))
|
err = f4.UnmarshalTOML([]byte(`"octet-counting"`))
|
||||||
assert.Equal(t, OctetCounting, f4)
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, OctetCounting, f4)
|
||||||
|
|
||||||
var f5 Framing
|
var f5 Framing
|
||||||
f5.UnmarshalTOML([]byte(`octet-counting`))
|
err = f5.UnmarshalTOML([]byte(`octet-counting`))
|
||||||
assert.Equal(t, OctetCounting, f5)
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, OctetCounting, f5)
|
||||||
|
|
||||||
var f6 Framing
|
var f6 Framing
|
||||||
f6.UnmarshalTOML([]byte(`'octet-counting'`))
|
err = f6.UnmarshalTOML([]byte(`'octet-counting'`))
|
||||||
assert.Equal(t, OctetCounting, f6)
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, OctetCounting, f6)
|
||||||
|
|
||||||
var f7 Framing
|
var f7 Framing
|
||||||
err := f7.UnmarshalTOML([]byte(`nope`))
|
err = f7.UnmarshalTOML([]byte(`nope`))
|
||||||
assert.Equal(t, Framing(-1), f7)
|
require.Error(t, err)
|
||||||
assert.Error(t, err)
|
require.Equal(t, Framing(-1), f7)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/sys/windows/svc/eventlog"
|
"golang.org/x/sys/windows/svc/eventlog"
|
||||||
)
|
)
|
||||||
|
|
@ -56,16 +55,18 @@ func TestEventLogIntegration(t *testing.T) {
|
||||||
Logfile: "",
|
Logfile: "",
|
||||||
}
|
}
|
||||||
|
|
||||||
SetupLogging(config)
|
err := SetupLogging(config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
log.Println("I! Info message")
|
log.Println("I! Info message")
|
||||||
log.Println("W! Warn message")
|
log.Println("W! Warn message")
|
||||||
log.Println("E! Err message")
|
log.Println("E! Err message")
|
||||||
events := getEventLog(t, now)
|
events := getEventLog(t, now)
|
||||||
assert.Len(t, events, 3)
|
require.Len(t, events, 3)
|
||||||
assert.Contains(t, events, Event{Message: "Info message", Level: Info})
|
require.Contains(t, events, Event{Message: "Info message", Level: Info})
|
||||||
assert.Contains(t, events, Event{Message: "Warn message", Level: Warning})
|
require.Contains(t, events, Event{Message: "Warn message", Level: Warning})
|
||||||
assert.Contains(t, events, Event{Message: "Err message", Level: Error})
|
require.Contains(t, events, Event{Message: "Err message", Level: Error})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRestrictedEventLogIntegration(t *testing.T) {
|
func TestRestrictedEventLogIntegration(t *testing.T) {
|
||||||
|
|
@ -79,7 +80,8 @@ func TestRestrictedEventLogIntegration(t *testing.T) {
|
||||||
Quiet: true,
|
Quiet: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
SetupLogging(config)
|
err := SetupLogging(config)
|
||||||
|
require.NoError(t, err)
|
||||||
//separate previous log messages by small delay
|
//separate previous log messages by small delay
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
@ -87,8 +89,8 @@ func TestRestrictedEventLogIntegration(t *testing.T) {
|
||||||
log.Println("W! Warning message")
|
log.Println("W! Warning message")
|
||||||
log.Println("E! Error message")
|
log.Println("E! Error message")
|
||||||
events := getEventLog(t, now)
|
events := getEventLog(t, now)
|
||||||
assert.Len(t, events, 1)
|
require.Len(t, events, 1)
|
||||||
assert.Contains(t, events, Event{Message: "Error message", Level: Error})
|
require.Contains(t, events, Event{Message: "Error message", Level: Error})
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareLogger(t *testing.T) {
|
func prepareLogger(t *testing.T) {
|
||||||
|
|
|
||||||
|
|
@ -9,9 +9,10 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/wlog"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal/rotate"
|
"github.com/influxdata/telegraf/internal/rotate"
|
||||||
"github.com/influxdata/wlog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var prefixRegex = regexp.MustCompile("^[DIWE]!")
|
var prefixRegex = regexp.MustCompile("^[DIWE]!")
|
||||||
|
|
@ -31,7 +32,7 @@ type LogConfig struct {
|
||||||
LogTarget string
|
LogTarget string
|
||||||
// will direct the logging output to a file. Empty string is
|
// will direct the logging output to a file. Empty string is
|
||||||
// interpreted as stderr. If there is an error opening the file the
|
// interpreted as stderr. If there is an error opening the file the
|
||||||
// logger will fallback to stderr
|
// logger will fall back to stderr
|
||||||
Logfile string
|
Logfile string
|
||||||
// will rotate when current file at the specified time interval
|
// will rotate when current file at the specified time interval
|
||||||
RotationInterval config.Duration
|
RotationInterval config.Duration
|
||||||
|
|
@ -43,15 +44,15 @@ type LogConfig struct {
|
||||||
LogWithTimezone string
|
LogWithTimezone string
|
||||||
}
|
}
|
||||||
|
|
||||||
type LoggerCreator interface {
|
type creator interface {
|
||||||
CreateLogger(cfg LogConfig) (io.Writer, error)
|
CreateLogger(cfg LogConfig) (io.Writer, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var loggerRegistry map[string]LoggerCreator
|
var loggerRegistry map[string]creator
|
||||||
|
|
||||||
func registerLogger(name string, loggerCreator LoggerCreator) {
|
func registerLogger(name string, loggerCreator creator) {
|
||||||
if loggerRegistry == nil {
|
if loggerRegistry == nil {
|
||||||
loggerRegistry = make(map[string]LoggerCreator)
|
loggerRegistry = make(map[string]creator)
|
||||||
}
|
}
|
||||||
loggerRegistry[name] = loggerCreator
|
loggerRegistry[name] = loggerCreator
|
||||||
}
|
}
|
||||||
|
|
@ -110,8 +111,9 @@ func newTelegrafWriter(w io.Writer, c LogConfig) (io.Writer, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetupLogging configures the logging output.
|
// SetupLogging configures the logging output.
|
||||||
func SetupLogging(cfg LogConfig) {
|
func SetupLogging(cfg LogConfig) error {
|
||||||
newLogWriter(cfg)
|
_, err := newLogWriter(cfg)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
type telegrafLogCreator struct {
|
type telegrafLogCreator struct {
|
||||||
|
|
@ -146,7 +148,7 @@ func (t *telegrafLogCreator) CreateLogger(cfg LogConfig) (io.Writer, error) {
|
||||||
// It allows closing previous writer if re-set and have possibility to test what is actually set
|
// It allows closing previous writer if re-set and have possibility to test what is actually set
|
||||||
var actualLogger io.Writer
|
var actualLogger io.Writer
|
||||||
|
|
||||||
func newLogWriter(cfg LogConfig) io.Writer {
|
func newLogWriter(cfg LogConfig) (io.Writer, error) {
|
||||||
log.SetFlags(0)
|
log.SetFlags(0)
|
||||||
if cfg.Debug {
|
if cfg.Debug {
|
||||||
wlog.SetLevel(wlog.DEBUG)
|
wlog.SetLevel(wlog.DEBUG)
|
||||||
|
|
@ -166,12 +168,15 @@ func newLogWriter(cfg LogConfig) io.Writer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if closer, isCloser := actualLogger.(io.Closer); isCloser {
|
if closer, isCloser := actualLogger.(io.Closer); isCloser {
|
||||||
closer.Close()
|
if err := closer.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.SetOutput(logWriter)
|
log.SetOutput(logWriter)
|
||||||
actualLogger = logWriter
|
actualLogger = logWriter
|
||||||
|
|
||||||
return logWriter
|
return logWriter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
||||||
|
|
@ -8,91 +8,100 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/config"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWriteLogToFile(t *testing.T) {
|
func TestWriteLogToFile(t *testing.T) {
|
||||||
tmpfile, err := os.CreateTemp("", "")
|
tmpfile, err := os.CreateTemp("", "")
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { os.Remove(tmpfile.Name()) }()
|
defer os.Remove(tmpfile.Name())
|
||||||
|
|
||||||
cfg := createBasicLogConfig(tmpfile.Name())
|
cfg := createBasicLogConfig(tmpfile.Name())
|
||||||
SetupLogging(cfg)
|
err = SetupLogging(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
log.Printf("I! TEST")
|
log.Printf("I! TEST")
|
||||||
log.Printf("D! TEST") // <- should be ignored
|
log.Printf("D! TEST") // <- should be ignored
|
||||||
|
|
||||||
f, err := os.ReadFile(tmpfile.Name())
|
f, err := os.ReadFile(tmpfile.Name())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, f[19:], []byte("Z I! TEST\n"))
|
require.Equal(t, f[19:], []byte("Z I! TEST\n"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDebugWriteLogToFile(t *testing.T) {
|
func TestDebugWriteLogToFile(t *testing.T) {
|
||||||
tmpfile, err := os.CreateTemp("", "")
|
tmpfile, err := os.CreateTemp("", "")
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { os.Remove(tmpfile.Name()) }()
|
defer os.Remove(tmpfile.Name())
|
||||||
|
|
||||||
cfg := createBasicLogConfig(tmpfile.Name())
|
cfg := createBasicLogConfig(tmpfile.Name())
|
||||||
cfg.Debug = true
|
cfg.Debug = true
|
||||||
SetupLogging(cfg)
|
err = SetupLogging(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
log.Printf("D! TEST")
|
log.Printf("D! TEST")
|
||||||
|
|
||||||
f, err := os.ReadFile(tmpfile.Name())
|
f, err := os.ReadFile(tmpfile.Name())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, f[19:], []byte("Z D! TEST\n"))
|
require.Equal(t, f[19:], []byte("Z D! TEST\n"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestErrorWriteLogToFile(t *testing.T) {
|
func TestErrorWriteLogToFile(t *testing.T) {
|
||||||
tmpfile, err := os.CreateTemp("", "")
|
tmpfile, err := os.CreateTemp("", "")
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { os.Remove(tmpfile.Name()) }()
|
defer os.Remove(tmpfile.Name())
|
||||||
|
|
||||||
cfg := createBasicLogConfig(tmpfile.Name())
|
cfg := createBasicLogConfig(tmpfile.Name())
|
||||||
cfg.Quiet = true
|
cfg.Quiet = true
|
||||||
SetupLogging(cfg)
|
err = SetupLogging(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
log.Printf("E! TEST")
|
log.Printf("E! TEST")
|
||||||
log.Printf("I! TEST") // <- should be ignored
|
log.Printf("I! TEST") // <- should be ignored
|
||||||
|
|
||||||
f, err := os.ReadFile(tmpfile.Name())
|
f, err := os.ReadFile(tmpfile.Name())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, f[19:], []byte("Z E! TEST\n"))
|
require.Equal(t, f[19:], []byte("Z E! TEST\n"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAddDefaultLogLevel(t *testing.T) {
|
func TestAddDefaultLogLevel(t *testing.T) {
|
||||||
tmpfile, err := os.CreateTemp("", "")
|
tmpfile, err := os.CreateTemp("", "")
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { os.Remove(tmpfile.Name()) }()
|
defer os.Remove(tmpfile.Name())
|
||||||
|
|
||||||
cfg := createBasicLogConfig(tmpfile.Name())
|
cfg := createBasicLogConfig(tmpfile.Name())
|
||||||
cfg.Debug = true
|
cfg.Debug = true
|
||||||
SetupLogging(cfg)
|
err = SetupLogging(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
log.Printf("TEST")
|
log.Printf("TEST")
|
||||||
|
|
||||||
f, err := os.ReadFile(tmpfile.Name())
|
f, err := os.ReadFile(tmpfile.Name())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, f[19:], []byte("Z I! TEST\n"))
|
require.Equal(t, f[19:], []byte("Z I! TEST\n"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteToTruncatedFile(t *testing.T) {
|
func TestWriteToTruncatedFile(t *testing.T) {
|
||||||
tmpfile, err := os.CreateTemp("", "")
|
tmpfile, err := os.CreateTemp("", "")
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { os.Remove(tmpfile.Name()) }()
|
defer os.Remove(tmpfile.Name())
|
||||||
|
|
||||||
cfg := createBasicLogConfig(tmpfile.Name())
|
cfg := createBasicLogConfig(tmpfile.Name())
|
||||||
cfg.Debug = true
|
cfg.Debug = true
|
||||||
SetupLogging(cfg)
|
err = SetupLogging(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
log.Printf("TEST")
|
log.Printf("TEST")
|
||||||
|
|
||||||
f, err := os.ReadFile(tmpfile.Name())
|
f, err := os.ReadFile(tmpfile.Name())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, f[19:], []byte("Z I! TEST\n"))
|
require.Equal(t, f[19:], []byte("Z I! TEST\n"))
|
||||||
|
|
||||||
tmpf, err := os.OpenFile(tmpfile.Name(), os.O_RDWR|os.O_TRUNC, 0644)
|
tmpf, err := os.OpenFile(tmpfile.Name(), os.O_RDWR|os.O_TRUNC, 0644)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NoError(t, tmpf.Close())
|
require.NoError(t, tmpf.Close())
|
||||||
|
|
||||||
log.Printf("SHOULD BE FIRST")
|
log.Printf("SHOULD BE FIRST")
|
||||||
|
|
||||||
f, err = os.ReadFile(tmpfile.Name())
|
f, err = os.ReadFile(tmpfile.Name())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, f[19:], []byte("Z I! SHOULD BE FIRST\n"))
|
require.Equal(t, f[19:], []byte("Z I! SHOULD BE FIRST\n"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteToFileInRotation(t *testing.T) {
|
func TestWriteToFileInRotation(t *testing.T) {
|
||||||
|
|
@ -100,36 +109,40 @@ func TestWriteToFileInRotation(t *testing.T) {
|
||||||
cfg := createBasicLogConfig(filepath.Join(tempDir, "test.log"))
|
cfg := createBasicLogConfig(filepath.Join(tempDir, "test.log"))
|
||||||
cfg.LogTarget = LogTargetFile
|
cfg.LogTarget = LogTargetFile
|
||||||
cfg.RotationMaxSize = config.Size(30)
|
cfg.RotationMaxSize = config.Size(30)
|
||||||
writer := newLogWriter(cfg)
|
writer, err := newLogWriter(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
// Close the writer here, otherwise the temp folder cannot be deleted because the current log file is in use.
|
// Close the writer here, otherwise the temp folder cannot be deleted because the current log file is in use.
|
||||||
closer, isCloser := writer.(io.Closer)
|
closer, isCloser := writer.(io.Closer)
|
||||||
assert.True(t, isCloser)
|
require.True(t, isCloser)
|
||||||
t.Cleanup(func() { require.NoError(t, closer.Close()) })
|
t.Cleanup(func() { require.NoError(t, closer.Close()) })
|
||||||
|
|
||||||
log.Printf("I! TEST 1") // Writes 31 bytes, will rotate
|
log.Printf("I! TEST 1") // Writes 31 bytes, will rotate
|
||||||
log.Printf("I! TEST") // Writes 29 byes, no rotation expected
|
log.Printf("I! TEST") // Writes 29 byes, no rotation expected
|
||||||
files, _ := os.ReadDir(tempDir)
|
files, _ := os.ReadDir(tempDir)
|
||||||
assert.Equal(t, 2, len(files))
|
require.Equal(t, 2, len(files))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLogTargetSettings(t *testing.T) {
|
func TestLogTargetSettings(t *testing.T) {
|
||||||
|
actualLogger = nil
|
||||||
cfg := LogConfig{
|
cfg := LogConfig{
|
||||||
LogTarget: "",
|
LogTarget: "",
|
||||||
Quiet: true,
|
Quiet: true,
|
||||||
}
|
}
|
||||||
SetupLogging(cfg)
|
err := SetupLogging(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
logger, isTelegrafLogger := actualLogger.(*telegrafLog)
|
logger, isTelegrafLogger := actualLogger.(*telegrafLog)
|
||||||
assert.True(t, isTelegrafLogger)
|
require.True(t, isTelegrafLogger)
|
||||||
assert.Equal(t, logger.internalWriter, os.Stderr)
|
require.Equal(t, logger.internalWriter, os.Stderr)
|
||||||
|
|
||||||
cfg = LogConfig{
|
cfg = LogConfig{
|
||||||
LogTarget: "stderr",
|
LogTarget: "stderr",
|
||||||
Quiet: true,
|
Quiet: true,
|
||||||
}
|
}
|
||||||
SetupLogging(cfg)
|
err = SetupLogging(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
logger, isTelegrafLogger = actualLogger.(*telegrafLog)
|
logger, isTelegrafLogger = actualLogger.(*telegrafLog)
|
||||||
assert.True(t, isTelegrafLogger)
|
require.True(t, isTelegrafLogger)
|
||||||
assert.Equal(t, logger.internalWriter, os.Stderr)
|
require.Equal(t, logger.internalWriter, os.Stderr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkTelegrafLogWrite(b *testing.B) {
|
func BenchmarkTelegrafLogWrite(b *testing.B) {
|
||||||
|
|
@ -141,7 +154,10 @@ func BenchmarkTelegrafLogWrite(b *testing.B) {
|
||||||
}
|
}
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
w.Write(msg)
|
_, err = w.Write(msg)
|
||||||
|
if err != nil {
|
||||||
|
panic("Unable to write message")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -250,13 +250,13 @@ func (m *metric) Copy() telegraf.Metric {
|
||||||
|
|
||||||
func (m *metric) HashID() uint64 {
|
func (m *metric) HashID() uint64 {
|
||||||
h := fnv.New64a()
|
h := fnv.New64a()
|
||||||
h.Write([]byte(m.name))
|
h.Write([]byte(m.name)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
h.Write([]byte("\n"))
|
h.Write([]byte("\n")) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
for _, tag := range m.tags {
|
for _, tag := range m.tags {
|
||||||
h.Write([]byte(tag.Key))
|
h.Write([]byte(tag.Key)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
h.Write([]byte("\n"))
|
h.Write([]byte("\n")) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
h.Write([]byte(tag.Value))
|
h.Write([]byte(tag.Value)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
h.Write([]byte("\n"))
|
h.Write([]byte("\n")) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
}
|
}
|
||||||
return h.Sum64()
|
return h.Sum64()
|
||||||
}
|
}
|
||||||
|
|
@ -270,7 +270,7 @@ func (m *metric) Reject() {
|
||||||
func (m *metric) Drop() {
|
func (m *metric) Drop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert field to a supported type or nil if unconvertible
|
// Convert field to a supported type or nil if inconvertible
|
||||||
func convertField(v interface{}) interface{} {
|
func convertField(v interface{}) interface{} {
|
||||||
switch v := v.(type) {
|
switch v := v.(type) {
|
||||||
case float64:
|
case float64:
|
||||||
|
|
|
||||||
|
|
@ -88,20 +88,20 @@ func groupID(seed maphash.Seed, measurement string, taglist []*telegraf.Tag, tm
|
||||||
var mh maphash.Hash
|
var mh maphash.Hash
|
||||||
mh.SetSeed(seed)
|
mh.SetSeed(seed)
|
||||||
|
|
||||||
mh.WriteString(measurement)
|
mh.WriteString(measurement) //nolint:revive // all Write***() methods for hash in maphash.go returns nil err
|
||||||
mh.WriteByte(0)
|
mh.WriteByte(0) //nolint:revive // all Write***() methods for hash in maphash.go returns nil err
|
||||||
|
|
||||||
for _, tag := range taglist {
|
for _, tag := range taglist {
|
||||||
mh.WriteString(tag.Key)
|
mh.WriteString(tag.Key) //nolint:revive // all Write***() methods for hash in maphash.go returns nil err
|
||||||
mh.WriteByte(0)
|
mh.WriteByte(0) //nolint:revive // all Write***() methods for hash in maphash.go returns nil err
|
||||||
mh.WriteString(tag.Value)
|
mh.WriteString(tag.Value) //nolint:revive // all Write***() methods for hash in maphash.go returns nil err
|
||||||
mh.WriteByte(0)
|
mh.WriteByte(0) //nolint:revive // all Write***() methods for hash in maphash.go returns nil err
|
||||||
}
|
}
|
||||||
mh.WriteByte(0)
|
mh.WriteByte(0) //nolint:revive // all Write***() methods for hash in maphash.go returns nil err
|
||||||
|
|
||||||
var tsBuf [8]byte
|
var tsBuf [8]byte
|
||||||
binary.BigEndian.PutUint64(tsBuf[:], uint64(tm.UnixNano()))
|
binary.BigEndian.PutUint64(tsBuf[:], uint64(tm.UnixNano()))
|
||||||
mh.Write(tsBuf[:])
|
mh.Write(tsBuf[:]) //nolint:revive // all Write***() methods for hash in maphash.go returns nil err
|
||||||
|
|
||||||
return mh.Sum64()
|
return mh.Sum64()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,11 +6,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/selfstat"
|
"github.com/influxdata/telegraf/selfstat"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var first5 = []telegraf.Metric{
|
var first5 = []telegraf.Metric{
|
||||||
|
|
@ -40,7 +40,7 @@ func BenchmarkRunningOutputAddWrite(b *testing.B) {
|
||||||
|
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
ro.Write()
|
ro.Write() //nolint: revive // skip checking err for benchmark tests
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -56,7 +56,7 @@ func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) {
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
if n%100 == 0 {
|
if n%100 == 0 {
|
||||||
ro.Write()
|
ro.Write() //nolint: revive // skip checking err for benchmark tests
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -83,7 +83,7 @@ func TestRunningOutput_DropFilter(t *testing.T) {
|
||||||
NameDrop: []string{"metric1", "metric2"},
|
NameDrop: []string{"metric1", "metric2"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.NoError(t, conf.Filter.Compile())
|
require.NoError(t, conf.Filter.Compile())
|
||||||
|
|
||||||
m := &mockOutput{}
|
m := &mockOutput{}
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
@ -94,11 +94,11 @@ func TestRunningOutput_DropFilter(t *testing.T) {
|
||||||
for _, metric := range next5 {
|
for _, metric := range next5 {
|
||||||
ro.AddMetric(metric)
|
ro.AddMetric(metric)
|
||||||
}
|
}
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 8)
|
require.Len(t, m.Metrics(), 8)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that NameDrop filters without a match do nothing.
|
// Test that NameDrop filters without a match do nothing.
|
||||||
|
|
@ -108,7 +108,7 @@ func TestRunningOutput_PassFilter(t *testing.T) {
|
||||||
NameDrop: []string{"metric1000", "foo*"},
|
NameDrop: []string{"metric1000", "foo*"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.NoError(t, conf.Filter.Compile())
|
require.NoError(t, conf.Filter.Compile())
|
||||||
|
|
||||||
m := &mockOutput{}
|
m := &mockOutput{}
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
@ -119,11 +119,11 @@ func TestRunningOutput_PassFilter(t *testing.T) {
|
||||||
for _, metric := range next5 {
|
for _, metric := range next5 {
|
||||||
ro.AddMetric(metric)
|
ro.AddMetric(metric)
|
||||||
}
|
}
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 10)
|
require.Len(t, m.Metrics(), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that tags are properly included
|
// Test that tags are properly included
|
||||||
|
|
@ -133,18 +133,18 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
|
||||||
TagInclude: []string{"nothing*"},
|
TagInclude: []string{"nothing*"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.NoError(t, conf.Filter.Compile())
|
require.NoError(t, conf.Filter.Compile())
|
||||||
|
|
||||||
m := &mockOutput{}
|
m := &mockOutput{}
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 1)
|
require.Len(t, m.Metrics(), 1)
|
||||||
assert.Empty(t, m.Metrics()[0].Tags())
|
require.Empty(t, m.Metrics()[0].Tags())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that tags are properly excluded
|
// Test that tags are properly excluded
|
||||||
|
|
@ -154,18 +154,18 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) {
|
||||||
TagExclude: []string{"tag*"},
|
TagExclude: []string{"tag*"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.NoError(t, conf.Filter.Compile())
|
require.NoError(t, conf.Filter.Compile())
|
||||||
|
|
||||||
m := &mockOutput{}
|
m := &mockOutput{}
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 1)
|
require.Len(t, m.Metrics(), 1)
|
||||||
assert.Len(t, m.Metrics()[0].Tags(), 0)
|
require.Len(t, m.Metrics()[0].Tags(), 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that tags are properly Excluded
|
// Test that tags are properly Excluded
|
||||||
|
|
@ -175,18 +175,18 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
|
||||||
TagExclude: []string{"nothing*"},
|
TagExclude: []string{"nothing*"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.NoError(t, conf.Filter.Compile())
|
require.NoError(t, conf.Filter.Compile())
|
||||||
|
|
||||||
m := &mockOutput{}
|
m := &mockOutput{}
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 1)
|
require.Len(t, m.Metrics(), 1)
|
||||||
assert.Len(t, m.Metrics()[0].Tags(), 1)
|
require.Len(t, m.Metrics()[0].Tags(), 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that tags are properly included
|
// Test that tags are properly included
|
||||||
|
|
@ -196,18 +196,18 @@ func TestRunningOutput_TagIncludeMatch(t *testing.T) {
|
||||||
TagInclude: []string{"tag*"},
|
TagInclude: []string{"tag*"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.NoError(t, conf.Filter.Compile())
|
require.NoError(t, conf.Filter.Compile())
|
||||||
|
|
||||||
m := &mockOutput{}
|
m := &mockOutput{}
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 1)
|
require.Len(t, m.Metrics(), 1)
|
||||||
assert.Len(t, m.Metrics()[0].Tags(), 1)
|
require.Len(t, m.Metrics()[0].Tags(), 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that measurement name overriding correctly
|
// Test that measurement name overriding correctly
|
||||||
|
|
@ -220,12 +220,12 @@ func TestRunningOutput_NameOverride(t *testing.T) {
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 1)
|
require.Len(t, m.Metrics(), 1)
|
||||||
assert.Equal(t, "new_metric_name", m.Metrics()[0].Name())
|
require.Equal(t, "new_metric_name", m.Metrics()[0].Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that measurement name prefix is added correctly
|
// Test that measurement name prefix is added correctly
|
||||||
|
|
@ -238,12 +238,12 @@ func TestRunningOutput_NamePrefix(t *testing.T) {
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 1)
|
require.Len(t, m.Metrics(), 1)
|
||||||
assert.Equal(t, "prefix_metric1", m.Metrics()[0].Name())
|
require.Equal(t, "prefix_metric1", m.Metrics()[0].Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that measurement name suffix is added correctly
|
// Test that measurement name suffix is added correctly
|
||||||
|
|
@ -256,12 +256,12 @@ func TestRunningOutput_NameSuffix(t *testing.T) {
|
||||||
ro := NewRunningOutput(m, conf, 1000, 10000)
|
ro := NewRunningOutput(m, conf, 1000, 10000)
|
||||||
|
|
||||||
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
ro.AddMetric(testutil.TestMetric(101, "metric1"))
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 1)
|
require.Len(t, m.Metrics(), 1)
|
||||||
assert.Equal(t, "metric1_suffix", m.Metrics()[0].Name())
|
require.Equal(t, "metric1_suffix", m.Metrics()[0].Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that we can write metrics with simple default setup.
|
// Test that we can write metrics with simple default setup.
|
||||||
|
|
@ -279,11 +279,11 @@ func TestRunningOutputDefault(t *testing.T) {
|
||||||
for _, metric := range next5 {
|
for _, metric := range next5 {
|
||||||
ro.AddMetric(metric)
|
ro.AddMetric(metric)
|
||||||
}
|
}
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, m.Metrics(), 10)
|
require.Len(t, m.Metrics(), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRunningOutputWriteFail(t *testing.T) {
|
func TestRunningOutputWriteFail(t *testing.T) {
|
||||||
|
|
@ -303,22 +303,22 @@ func TestRunningOutputWriteFail(t *testing.T) {
|
||||||
ro.AddMetric(metric)
|
ro.AddMetric(metric)
|
||||||
}
|
}
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
// manual write fails
|
// manual write fails
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
m.failWrite = false
|
m.failWrite = false
|
||||||
err = ro.Write()
|
err = ro.Write()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Len(t, m.Metrics(), 10)
|
require.Len(t, m.Metrics(), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that the order of points is preserved during a write failure.
|
// Verify that the order of points is preserved during write failure.
|
||||||
func TestRunningOutputWriteFailOrder(t *testing.T) {
|
func TestRunningOutputWriteFailOrder(t *testing.T) {
|
||||||
conf := &OutputConfig{
|
conf := &OutputConfig{
|
||||||
Filter: Filter{},
|
Filter: Filter{},
|
||||||
|
|
@ -333,13 +333,13 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
|
||||||
ro.AddMetric(metric)
|
ro.AddMetric(metric)
|
||||||
}
|
}
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
// Write fails
|
// Write fails
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
m.failWrite = false
|
m.failWrite = false
|
||||||
// add 5 more metrics
|
// add 5 more metrics
|
||||||
|
|
@ -350,10 +350,10 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Verify that 10 metrics were written
|
// Verify that 10 metrics were written
|
||||||
assert.Len(t, m.Metrics(), 10)
|
require.Len(t, m.Metrics(), 10)
|
||||||
// Verify that they are in order
|
// Verify that they are in order
|
||||||
expected := append(first5, next5...)
|
expected := append(first5, next5...)
|
||||||
assert.Equal(t, expected, m.Metrics())
|
require.Equal(t, expected, m.Metrics())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that the order of points is preserved during many write failures.
|
// Verify that the order of points is preserved during many write failures.
|
||||||
|
|
@ -374,7 +374,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
// add 5 metrics
|
// add 5 metrics
|
||||||
for _, metric := range next5 {
|
for _, metric := range next5 {
|
||||||
|
|
@ -384,7 +384,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
|
||||||
err = ro.Write()
|
err = ro.Write()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
// add 5 metrics
|
// add 5 metrics
|
||||||
for _, metric := range first5 {
|
for _, metric := range first5 {
|
||||||
|
|
@ -394,7 +394,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
|
||||||
err = ro.Write()
|
err = ro.Write()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
// add 5 metrics
|
// add 5 metrics
|
||||||
for _, metric := range next5 {
|
for _, metric := range next5 {
|
||||||
|
|
@ -404,19 +404,19 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
|
||||||
err = ro.Write()
|
err = ro.Write()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
m.failWrite = false
|
m.failWrite = false
|
||||||
err = ro.Write()
|
err = ro.Write()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Verify that 20 metrics were written
|
// Verify that 20 metrics were written
|
||||||
assert.Len(t, m.Metrics(), 20)
|
require.Len(t, m.Metrics(), 20)
|
||||||
// Verify that they are in order
|
// Verify that they are in order
|
||||||
expected := append(first5, next5...)
|
expected := append(first5, next5...)
|
||||||
expected = append(expected, first5...)
|
expected = append(expected, first5...)
|
||||||
expected = append(expected, next5...)
|
expected = append(expected, next5...)
|
||||||
assert.Equal(t, expected, m.Metrics())
|
require.Equal(t, expected, m.Metrics())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that the order of points is preserved when there is a remainder
|
// Verify that the order of points is preserved when there is a remainder
|
||||||
|
|
@ -435,13 +435,13 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
|
||||||
ro.AddMetric(metric)
|
ro.AddMetric(metric)
|
||||||
}
|
}
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
// Write fails
|
// Write fails
|
||||||
err := ro.Write()
|
err := ro.Write()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// no successful flush yet
|
// no successful flush yet
|
||||||
assert.Len(t, m.Metrics(), 0)
|
require.Len(t, m.Metrics(), 0)
|
||||||
|
|
||||||
// add and attempt to write a single metric:
|
// add and attempt to write a single metric:
|
||||||
ro.AddMetric(next5[0])
|
ro.AddMetric(next5[0])
|
||||||
|
|
@ -454,10 +454,10 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Verify that 6 metrics were written
|
// Verify that 6 metrics were written
|
||||||
assert.Len(t, m.Metrics(), 6)
|
require.Len(t, m.Metrics(), 6)
|
||||||
// Verify that they are in order
|
// Verify that they are in order
|
||||||
expected := []telegraf.Metric{first5[0], first5[1], first5[2], first5[3], first5[4], next5[0]}
|
expected := []telegraf.Metric{first5[0], first5[1], first5[2], first5[3], first5[4], next5[0]}
|
||||||
assert.Equal(t, expected, m.Metrics())
|
require.Equal(t, expected, m.Metrics())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInternalMetrics(t *testing.T) {
|
func TestInternalMetrics(t *testing.T) {
|
||||||
|
|
@ -508,7 +508,7 @@ type mockOutput struct {
|
||||||
|
|
||||||
metrics []telegraf.Metric
|
metrics []telegraf.Metric
|
||||||
|
|
||||||
// if true, mock a write failure
|
// if true, mock write failure
|
||||||
failWrite bool
|
failWrite bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -550,7 +550,7 @@ func (m *mockOutput) Metrics() []telegraf.Metric {
|
||||||
}
|
}
|
||||||
|
|
||||||
type perfOutput struct {
|
type perfOutput struct {
|
||||||
// if true, mock a write failure
|
// if true, mock write failure
|
||||||
failWrite bool
|
failWrite bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,10 @@
|
||||||
|
// Package sqltemplate
|
||||||
/*
|
/*
|
||||||
Templates are used for creation of the SQL used when creating and modifying tables. These templates are specified within
|
Templates are used for creation of the SQL used when creating and modifying tables. These templates are specified within
|
||||||
the configuration as the parameters 'create_templates', 'add_column_templates, 'tag_table_create_templates', and
|
the configuration as the parameters 'create_templates', 'add_column_templates', 'tag_table_create_templates', and
|
||||||
'tag_table_add_column_templates'.
|
'tag_table_add_column_templates'.
|
||||||
|
|
||||||
The templating functionality behaves the same in all cases. However the variables will differ.
|
The templating functionality behaves the same in all cases. However, the variables will differ.
|
||||||
|
|
||||||
# Variables
|
# Variables
|
||||||
|
|
||||||
|
|
@ -119,9 +120,9 @@ import (
|
||||||
"text/template"
|
"text/template"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
|
||||||
|
|
||||||
"github.com/Masterminds/sprig"
|
"github.com/Masterminds/sprig"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
var templateFuncs = map[string]interface{}{
|
var templateFuncs = map[string]interface{}{
|
||||||
|
|
@ -235,7 +236,8 @@ func (tc Column) Identifier() string {
|
||||||
return QuoteIdentifier(tc.Name)
|
return QuoteIdentifier(tc.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Selector returns the selector for the column. For most cases this is the same as Identifier. However in some cases, such as a UNION, this may return a statement such as `NULL AS "foo"`.
|
// Selector returns the selector for the column. For most cases this is the same as Identifier.
|
||||||
|
// However, in some cases, such as a UNION, this may return a statement such as `NULL AS "foo"`.
|
||||||
func (tc Column) Selector() string {
|
func (tc Column) Selector() string {
|
||||||
if tc.Type != "" {
|
if tc.Type != "" {
|
||||||
return tc.Identifier()
|
return tc.Identifier()
|
||||||
|
|
@ -243,12 +245,12 @@ func (tc Column) Selector() string {
|
||||||
return "NULL AS " + tc.Identifier()
|
return "NULL AS " + tc.Identifier()
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsTag returns true if the column is a tag column. Otherwise false.
|
// IsTag returns true if the column is a tag column. Otherwise, false.
|
||||||
func (tc Column) IsTag() bool {
|
func (tc Column) IsTag() bool {
|
||||||
return tc.Role == utils.TagColType
|
return tc.Role == utils.TagColType
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsField returns true if the column is a field column. Otherwise false.
|
// IsField returns true if the column is a field column. Otherwise, false.
|
||||||
func (tc Column) IsField() bool {
|
func (tc Column) IsField() bool {
|
||||||
return tc.Role == utils.FieldColType
|
return tc.Role == utils.FieldColType
|
||||||
}
|
}
|
||||||
|
|
@ -381,8 +383,8 @@ func (cols Columns) Fields() Columns {
|
||||||
func (cols Columns) Hash() string {
|
func (cols Columns) Hash() string {
|
||||||
hash := fnv.New32a()
|
hash := fnv.New32a()
|
||||||
for _, tc := range cols.Sorted() {
|
for _, tc := range cols.Sorted() {
|
||||||
_, _ = hash.Write([]byte(tc.Name))
|
hash.Write([]byte(tc.Name)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
_, _ = hash.Write([]byte{0})
|
hash.Write([]byte{0}) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
}
|
}
|
||||||
return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash.Sum(nil)))
|
return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash.Sum(nil)))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -82,7 +82,7 @@ func NewTableSources(p *Postgresql, metrics []telegraf.Metric) map[string]*Table
|
||||||
|
|
||||||
func NewTableSource(postgresql *Postgresql, name string) *TableSource {
|
func NewTableSource(postgresql *Postgresql, name string) *TableSource {
|
||||||
h := fnv.New64a()
|
h := fnv.New64a()
|
||||||
_, _ = h.Write([]byte(name))
|
h.Write([]byte(name)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
|
|
||||||
tsrc := &TableSource{
|
tsrc := &TableSource{
|
||||||
postgresql: postgresql,
|
postgresql: postgresql,
|
||||||
|
|
@ -129,7 +129,7 @@ func (tsrc *TableSource) Name() string {
|
||||||
return tsrc.metrics[0].Name()
|
return tsrc.metrics[0].Name()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the superset of all tags of all metrics.
|
// TagColumns returns the superset of all tags of all metrics.
|
||||||
func (tsrc *TableSource) TagColumns() []utils.Column {
|
func (tsrc *TableSource) TagColumns() []utils.Column {
|
||||||
var cols []utils.Column
|
var cols []utils.Column
|
||||||
|
|
||||||
|
|
@ -142,12 +142,12 @@ func (tsrc *TableSource) TagColumns() []utils.Column {
|
||||||
return cols
|
return cols
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the superset of all fields of all metrics.
|
// FieldColumns returns the superset of all fields of all metrics.
|
||||||
func (tsrc *TableSource) FieldColumns() []utils.Column {
|
func (tsrc *TableSource) FieldColumns() []utils.Column {
|
||||||
return tsrc.fieldColumns.columns
|
return tsrc.fieldColumns.columns
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the full column list, including time, tag id or tags, and fields.
|
// MetricTableColumns returns the full column list, including time, tag id or tags, and fields.
|
||||||
func (tsrc *TableSource) MetricTableColumns() []utils.Column {
|
func (tsrc *TableSource) MetricTableColumns() []utils.Column {
|
||||||
cols := []utils.Column{
|
cols := []utils.Column{
|
||||||
timeColumn,
|
timeColumn,
|
||||||
|
|
@ -187,7 +187,7 @@ func (tsrc *TableSource) ColumnNames() []string {
|
||||||
return names
|
return names
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drops the specified column.
|
// DropColumn drops the specified column.
|
||||||
// If column is a tag column, any metrics containing the tag will be skipped.
|
// If column is a tag column, any metrics containing the tag will be skipped.
|
||||||
// If column is a field column, any metrics containing the field will have it omitted.
|
// If column is a field column, any metrics containing the field will have it omitted.
|
||||||
func (tsrc *TableSource) DropColumn(col utils.Column) error {
|
func (tsrc *TableSource) DropColumn(col utils.Column) error {
|
||||||
|
|
@ -272,7 +272,7 @@ func (tsrc *TableSource) getValues() ([]interface{}, error) {
|
||||||
for _, tag := range metric.TagList() {
|
for _, tag := range metric.TagList() {
|
||||||
tagPos, ok := tsrc.tagColumns.indices[tag.Key]
|
tagPos, ok := tsrc.tagColumns.indices[tag.Key]
|
||||||
if !ok {
|
if !ok {
|
||||||
// tag has been dropped, we can't emit or we risk collision with another metric
|
// tag has been dropped, we can't emit, or we risk collision with another metric
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
tagValues[tagPos] = tag.Value
|
tagValues[tagPos] = tag.Value
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ func FullTableName(schema, name string) pgx.Identifier {
|
||||||
return pgx.Identifier{name}
|
return pgx.Identifier{name}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pgxLogger makes telegraf.Logger compatible with pgx.Logger
|
// PGXLogger makes telegraf.Logger compatible with pgx.Logger
|
||||||
type PGXLogger struct {
|
type PGXLogger struct {
|
||||||
telegraf.Logger
|
telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
@ -71,10 +71,10 @@ func (l PGXLogger) Log(_ context.Context, level pgx.LogLevel, msg string, data m
|
||||||
func GetTagID(metric telegraf.Metric) int64 {
|
func GetTagID(metric telegraf.Metric) int64 {
|
||||||
hash := fnv.New64a()
|
hash := fnv.New64a()
|
||||||
for _, tag := range metric.TagList() {
|
for _, tag := range metric.TagList() {
|
||||||
_, _ = hash.Write([]byte(tag.Key))
|
hash.Write([]byte(tag.Key)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
_, _ = hash.Write([]byte{0})
|
hash.Write([]byte{0}) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
_, _ = hash.Write([]byte(tag.Value))
|
hash.Write([]byte(tag.Value)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
_, _ = hash.Write([]byte{0})
|
hash.Write([]byte{0}) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
}
|
}
|
||||||
// Convert to int64 as postgres does not support uint64
|
// Convert to int64 as postgres does not support uint64
|
||||||
return int64(hash.Sum64())
|
return int64(hash.Sum64())
|
||||||
|
|
|
||||||
|
|
@ -177,7 +177,7 @@ func (r *Registry) set(key uint64, s Stat) {
|
||||||
|
|
||||||
func key(measurement string, tags map[string]string) uint64 {
|
func key(measurement string, tags map[string]string) uint64 {
|
||||||
h := fnv.New64a()
|
h := fnv.New64a()
|
||||||
h.Write([]byte(measurement))
|
h.Write([]byte(measurement)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
|
|
||||||
tmp := make([]string, len(tags))
|
tmp := make([]string, len(tags))
|
||||||
i := 0
|
i := 0
|
||||||
|
|
@ -188,7 +188,7 @@ func key(measurement string, tags map[string]string) uint64 {
|
||||||
sort.Strings(tmp)
|
sort.Strings(tmp)
|
||||||
|
|
||||||
for _, s := range tmp {
|
for _, s := range tmp {
|
||||||
h.Write([]byte(s))
|
h.Write([]byte(s)) //nolint:revive // all Write() methods for hash in fnv.go returns nil err
|
||||||
}
|
}
|
||||||
|
|
||||||
return h.Sum64()
|
return h.Sum64()
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,13 @@ package testutil
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDockerHost(t *testing.T) {
|
func TestDockerHost(t *testing.T) {
|
||||||
os.Unsetenv("DOCKER_HOST")
|
err := os.Unsetenv("DOCKER_HOST")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
host := GetLocalHost()
|
host := GetLocalHost()
|
||||||
|
|
||||||
|
|
@ -14,7 +17,8 @@ func TestDockerHost(t *testing.T) {
|
||||||
t.Fatalf("Host should be localhost when DOCKER_HOST is not set. Current value [%s]", host)
|
t.Fatalf("Host should be localhost when DOCKER_HOST is not set. Current value [%s]", host)
|
||||||
}
|
}
|
||||||
|
|
||||||
os.Setenv("DOCKER_HOST", "1.1.1.1")
|
err = os.Setenv("DOCKER_HOST", "1.1.1.1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
host = GetLocalHost()
|
host = GetLocalHost()
|
||||||
|
|
||||||
|
|
@ -22,7 +26,8 @@ func TestDockerHost(t *testing.T) {
|
||||||
t.Fatalf("Host should take DOCKER_HOST value when set. Current value is [%s] and DOCKER_HOST is [%s]", host, os.Getenv("DOCKER_HOST"))
|
t.Fatalf("Host should take DOCKER_HOST value when set. Current value is [%s] and DOCKER_HOST is [%s]", host, os.Getenv("DOCKER_HOST"))
|
||||||
}
|
}
|
||||||
|
|
||||||
os.Setenv("DOCKER_HOST", "tcp://1.1.1.1:8080")
|
err = os.Setenv("DOCKER_HOST", "tcp://1.1.1.1:8080")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
host = GetLocalHost()
|
host = GetLocalHost()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue