feat(inputs.ntpq): Add possibility to query remote servers (#11592)
This commit is contained in:
parent
a049738a50
commit
e33ffeb06a
|
|
@ -35,6 +35,10 @@ server (RMS of difference of multiple time samples, milliseconds);
|
||||||
|
|
||||||
## Options to pass to the ntpq command.
|
## Options to pass to the ntpq command.
|
||||||
# options = "-p"
|
# options = "-p"
|
||||||
|
|
||||||
|
## Servers to query with ntpq.
|
||||||
|
## If no server is given, the local machine is queried.
|
||||||
|
# servers = []
|
||||||
```
|
```
|
||||||
|
|
||||||
You can pass arbitrary options accepted by the `ntpq` command using the
|
You can pass arbitrary options accepted by the `ntpq` command using the
|
||||||
|
|
@ -58,11 +62,15 @@ for example.
|
||||||
|
|
||||||
### Tags
|
### Tags
|
||||||
|
|
||||||
- All measurements have the following tags:
|
All measurements have the following tags:
|
||||||
- refid
|
|
||||||
- remote
|
- refid
|
||||||
- type
|
- remote
|
||||||
- stratum
|
- type
|
||||||
|
- stratum
|
||||||
|
|
||||||
|
In case you are specifying `servers`, the measurement has an
|
||||||
|
additional `source` tag.
|
||||||
|
|
||||||
## Example Output
|
## Example Output
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -61,10 +61,11 @@ var fieldElements = map[string]elementType{
|
||||||
}
|
}
|
||||||
|
|
||||||
type NTPQ struct {
|
type NTPQ struct {
|
||||||
DNSLookup bool `toml:"dns_lookup" deprecated:"1.24.0;add '-n' to 'options' instead to skip DNS lookup"`
|
DNSLookup bool `toml:"dns_lookup" deprecated:"1.24.0;add '-n' to 'options' instead to skip DNS lookup"`
|
||||||
Options string `toml:"options"`
|
Options string `toml:"options"`
|
||||||
|
Servers []string `toml:"servers"`
|
||||||
|
|
||||||
runQ func() ([]byte, error)
|
runQ func(string) ([]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*NTPQ) SampleConfig() string {
|
func (*NTPQ) SampleConfig() string {
|
||||||
|
|
@ -72,23 +73,33 @@ func (*NTPQ) SampleConfig() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NTPQ) Init() error {
|
func (n *NTPQ) Init() error {
|
||||||
|
if len(n.Servers) == 0 {
|
||||||
|
n.Servers = []string{""}
|
||||||
|
}
|
||||||
|
|
||||||
if n.runQ == nil {
|
if n.runQ == nil {
|
||||||
args, err := shellquote.Split(n.Options)
|
options, err := shellquote.Split(n.Options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("splitting options failed: %w", err)
|
return fmt.Errorf("splitting options failed: %w", err)
|
||||||
}
|
}
|
||||||
n.runQ = func() ([]byte, error) {
|
if !n.DNSLookup {
|
||||||
|
if !choice.Contains("-n", options) {
|
||||||
|
options = append(options, "-n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n.runQ = func(server string) ([]byte, error) {
|
||||||
bin, err := exec.LookPath("ntpq")
|
bin, err := exec.LookPath("ntpq")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !n.DNSLookup {
|
// Needs to be last argument
|
||||||
if !choice.Contains("-n", args) {
|
var args []string
|
||||||
args = append(args, "-n")
|
args = append(args, options...)
|
||||||
}
|
if server != "" {
|
||||||
|
args = append(args, server)
|
||||||
}
|
}
|
||||||
fmt.Println(args)
|
|
||||||
cmd := exec.Command(bin, args...)
|
cmd := exec.Command(bin, args...)
|
||||||
return cmd.Output()
|
return cmd.Output()
|
||||||
}
|
}
|
||||||
|
|
@ -97,9 +108,21 @@ func (n *NTPQ) Init() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
|
func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
|
||||||
out, err := n.runQ()
|
for _, server := range n.Servers {
|
||||||
|
n.gatherServer(acc, server)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NTPQ) gatherServer(acc telegraf.Accumulator, server string) {
|
||||||
|
var msgPrefix string
|
||||||
|
if server != "" {
|
||||||
|
msgPrefix = fmt.Sprintf("[%s] ", server)
|
||||||
|
}
|
||||||
|
out, err := n.runQ(server)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
acc.AddError(fmt.Errorf("%s%w", msgPrefix, err))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
scanner := bufio.NewScanner(bytes.NewReader(out))
|
scanner := bufio.NewScanner(bytes.NewReader(out))
|
||||||
|
|
@ -152,6 +175,9 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
|
||||||
if prefix != "" {
|
if prefix != "" {
|
||||||
tags["state_prefix"] = prefix
|
tags["state_prefix"] = prefix
|
||||||
}
|
}
|
||||||
|
if server != "" {
|
||||||
|
tags["source"] = server
|
||||||
|
}
|
||||||
|
|
||||||
for i, raw := range elements {
|
for i, raw := range elements {
|
||||||
col := columns[i]
|
col := columns[i]
|
||||||
|
|
@ -164,14 +190,16 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
|
||||||
case FieldInt:
|
case FieldInt:
|
||||||
value, err := strconv.ParseInt(raw, 10, 64)
|
value, err := strconv.ParseInt(raw, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err))
|
msg := fmt.Sprintf("%sparsing %q (%v) as int failed", msgPrefix, col.name, raw)
|
||||||
|
acc.AddError(fmt.Errorf("%s: %w", msg, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fields[col.name] = value
|
fields[col.name] = value
|
||||||
case FieldFloat:
|
case FieldFloat:
|
||||||
value, err := strconv.ParseFloat(raw, 64)
|
value, err := strconv.ParseFloat(raw, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(fmt.Errorf("parsing %q (%v) as float failed: %w", col.name, raw, err))
|
msg := fmt.Sprintf("%sparsing %q (%v) as float failed", msgPrefix, col.name, raw)
|
||||||
|
acc.AddError(fmt.Errorf("%s: %w", msg, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fields[col.name] = value
|
fields[col.name] = value
|
||||||
|
|
@ -190,7 +218,8 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
value, err := strconv.ParseInt(strings.TrimSuffix(raw, suffix), 10, 64)
|
value, err := strconv.ParseInt(strings.TrimSuffix(raw, suffix), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(fmt.Errorf("parsing %q (%v) as duration failed: %w", col.name, raw, err))
|
msg := fmt.Sprintf("%sparsing %q (%v) as duration failed", msgPrefix, col.name, raw)
|
||||||
|
acc.AddError(fmt.Errorf("%s: %w", msg, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fields[col.name] = value * factor
|
fields[col.name] = value * factor
|
||||||
|
|
@ -199,8 +228,6 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
acc.AddFields("ntpq", fields, tags)
|
acc.AddFields("ntpq", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func processLine(line string) (string, []string) {
|
func processLine(line string) (string, []string) {
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,10 @@ package ntpq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
|
@ -35,11 +37,10 @@ func TestCases(t *testing.T) {
|
||||||
if !f.IsDir() {
|
if !f.IsDir() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
configFilename := filepath.Join("testcases", f.Name(), "telegraf.conf")
|
testcasePath := filepath.Join("testcases", f.Name())
|
||||||
inputFilename := filepath.Join("testcases", f.Name(), "input.txt")
|
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
||||||
inputErrorFilename := filepath.Join("testcases", f.Name(), "input.err")
|
expectedFilename := filepath.Join(testcasePath, "expected.out")
|
||||||
expectedFilename := filepath.Join("testcases", f.Name(), "expected.out")
|
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
|
||||||
expectedErrorFilename := filepath.Join("testcases", f.Name(), "expected.err")
|
|
||||||
|
|
||||||
// Compare options
|
// Compare options
|
||||||
options := []cmp.Option{
|
options := []cmp.Option{
|
||||||
|
|
@ -49,18 +50,9 @@ func TestCases(t *testing.T) {
|
||||||
|
|
||||||
t.Run(f.Name(), func(t *testing.T) {
|
t.Run(f.Name(), func(t *testing.T) {
|
||||||
// Read the input data
|
// Read the input data
|
||||||
data, err := os.ReadFile(inputFilename)
|
inputData, inputErrors, err := readInputData(testcasePath)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Read the input error message if any
|
|
||||||
var inputErr error
|
|
||||||
if _, err := os.Stat(inputErrorFilename); err == nil {
|
|
||||||
x, err := testutil.ParseLinesFromFile(inputErrorFilename)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, x, 1)
|
|
||||||
inputErr = errors.New(x[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the expected output if any
|
// Read the expected output if any
|
||||||
var expected []telegraf.Metric
|
var expected []telegraf.Metric
|
||||||
if _, err := os.Stat(expectedFilename); err == nil {
|
if _, err := os.Stat(expectedFilename); err == nil {
|
||||||
|
|
@ -70,12 +62,12 @@ func TestCases(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the expected output if any
|
// Read the expected output if any
|
||||||
var errorMsg string
|
var expectedErrors []string
|
||||||
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
||||||
x, err := testutil.ParseLinesFromFile(expectedErrorFilename)
|
var err error
|
||||||
|
expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, x, 1)
|
require.NotEmpty(t, expectedErrors)
|
||||||
errorMsg = x[0]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configure the plugin
|
// Configure the plugin
|
||||||
|
|
@ -85,21 +77,70 @@ func TestCases(t *testing.T) {
|
||||||
|
|
||||||
// Fake the reading
|
// Fake the reading
|
||||||
plugin := cfg.Inputs[0].Input.(*NTPQ)
|
plugin := cfg.Inputs[0].Input.(*NTPQ)
|
||||||
plugin.runQ = func() ([]byte, error) {
|
plugin.runQ = func(server string) ([]byte, error) {
|
||||||
return data, inputErr
|
return inputData[server], inputErrors[server]
|
||||||
}
|
}
|
||||||
require.NoError(t, plugin.Init())
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
if errorMsg != "" {
|
require.NoError(t, plugin.Gather(&acc))
|
||||||
require.EqualError(t, plugin.Gather(&acc), errorMsg)
|
if len(acc.Errors) > 0 {
|
||||||
return
|
var actualErrorMsgs []string
|
||||||
|
for _, err := range acc.Errors {
|
||||||
|
actualErrorMsgs = append(actualErrorMsgs, err.Error())
|
||||||
|
}
|
||||||
|
require.ElementsMatch(t, actualErrorMsgs, expectedErrors)
|
||||||
}
|
}
|
||||||
|
|
||||||
// No error case
|
// Check the metric nevertheless as we might get some metrics despite errors.
|
||||||
require.NoError(t, plugin.Gather(&acc))
|
|
||||||
actual := acc.GetTelegrafMetrics()
|
actual := acc.GetTelegrafMetrics()
|
||||||
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func readInputData(path string) (map[string][]byte, map[string]error, error) {
|
||||||
|
// Get all elements in the testcase directory
|
||||||
|
entries, err := os.ReadDir(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
data := make(map[string][]byte)
|
||||||
|
errs := make(map[string]error)
|
||||||
|
for _, e := range entries {
|
||||||
|
if e.IsDir() || !strings.HasPrefix(e.Name(), "input") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := filepath.Join(path, e.Name())
|
||||||
|
ext := filepath.Ext(e.Name())
|
||||||
|
server := strings.TrimPrefix(e.Name(), "input")
|
||||||
|
server = strings.TrimPrefix(server, "_") // This needs to be separate for non-server cases
|
||||||
|
server = strings.TrimSuffix(server, ext)
|
||||||
|
|
||||||
|
switch ext {
|
||||||
|
case ".txt":
|
||||||
|
// Read the input data
|
||||||
|
d, err := os.ReadFile(filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("reading %q failed: %w", filename, err)
|
||||||
|
}
|
||||||
|
data[server] = d
|
||||||
|
case ".err":
|
||||||
|
// Read the input error message
|
||||||
|
msgs, err := testutil.ParseLinesFromFile(filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("reading error %q failed: %w", filename, err)
|
||||||
|
}
|
||||||
|
if len(msgs) != 1 {
|
||||||
|
return nil, nil, fmt.Errorf("unexpected number of errors: %d", len(msgs))
|
||||||
|
}
|
||||||
|
errs[server] = errors.New(msgs[0])
|
||||||
|
default:
|
||||||
|
return nil, nil, fmt.Errorf("unexpected input %q", filename)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return data, errs, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,3 +6,7 @@
|
||||||
|
|
||||||
## Options to pass to the ntpq command.
|
## Options to pass to the ntpq command.
|
||||||
# options = "-p"
|
# options = "-p"
|
||||||
|
|
||||||
|
## Servers to query with ntpq.
|
||||||
|
## If no server is given, the local machine is queried.
|
||||||
|
# servers = []
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
parsing "offset" (foobar) as float failed: strconv.ParseFloat: parsing "foobar": invalid syntax
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
parsing "poll" (foobar) as duration failed: strconv.ParseInt: parsing "foobar": invalid syntax
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
parsing "when" (2q) as duration failed: strconv.ParseInt: parsing "2q": invalid syntax
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
ntpq,source=serverA,refid=10.177.80.46,remote=uschi5-ntp-002.,state_prefix=*,stratum=2,type=u when=101i,poll=256i,reach=37i,delay=51.016,offset=233.010,jitter=17.462 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=83.137.98.96,stratum=2,type=u delay=54.033,offset=243.426,jitter=449514,when=740i,poll=1024i,reach=377i 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=81.7.16.52,stratum=2,type=u reach=377i,delay=60.785,offset=232.597,jitter=449539,when=739i,poll=1024i 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=131.188.3.221,stratum=2,type=u when=783i,poll=1024i,reach=377i,delay=111.82,offset=261.921,jitter=449528 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=5.9.29.107,stratum=2,type=u reach=377i,delay=205.704,offset=160.406,jitter=449602,when=703i,poll=1024i 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=91.189.94.4,stratum=2,type=u offset=274.726,jitter=449445,when=673i,poll=1024i,reach=377i,delay=143.047 0
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
remote refid st t when poll reach delay offset jitter
|
||||||
|
==============================================================================
|
||||||
|
*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 51.016 233.010 17.462
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
remote refid st t when poll reach delay offset jitter
|
||||||
|
==============================================================================
|
||||||
|
83.137.98.96 10.177.80.37 2 u 740 1024 377 54.033 243.426 449514.
|
||||||
|
81.7.16.52 10.177.80.37 2 u 739 1024 377 60.785 232.597 449539.
|
||||||
|
131.188.3.221 10.177.80.37 2 u 783 1024 377 111.820 261.921 449528.
|
||||||
|
5.9.29.107 10.177.80.37 2 u 703 1024 377 205.704 160.406 449602.
|
||||||
|
91.189.94.4 10.177.80.37 2 u 673 1024 377 143.047 274.726 449445.
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
[[inputs.ntpq]]
|
||||||
|
servers = ["serverA", "serverB"]
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
[serverC] not authorized
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
ntpq,source=serverA,refid=10.177.80.46,remote=uschi5-ntp-002.,state_prefix=*,stratum=2,type=u when=101i,poll=256i,reach=37i,delay=51.016,offset=233.010,jitter=17.462 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=83.137.98.96,stratum=2,type=u delay=54.033,offset=243.426,jitter=449514,when=740i,poll=1024i,reach=377i 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=81.7.16.52,stratum=2,type=u reach=377i,delay=60.785,offset=232.597,jitter=449539,when=739i,poll=1024i 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=131.188.3.221,stratum=2,type=u when=783i,poll=1024i,reach=377i,delay=111.82,offset=261.921,jitter=449528 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=5.9.29.107,stratum=2,type=u reach=377i,delay=205.704,offset=160.406,jitter=449602,when=703i,poll=1024i 0
|
||||||
|
ntpq,source=serverB,refid=10.177.80.37,remote=91.189.94.4,stratum=2,type=u offset=274.726,jitter=449445,when=673i,poll=1024i,reach=377i,delay=143.047 0
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
remote refid st t when poll reach delay offset jitter
|
||||||
|
==============================================================================
|
||||||
|
*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 51.016 233.010 17.462
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
remote refid st t when poll reach delay offset jitter
|
||||||
|
==============================================================================
|
||||||
|
83.137.98.96 10.177.80.37 2 u 740 1024 377 54.033 243.426 449514.
|
||||||
|
81.7.16.52 10.177.80.37 2 u 739 1024 377 60.785 232.597 449539.
|
||||||
|
131.188.3.221 10.177.80.37 2 u 783 1024 377 111.820 261.921 449528.
|
||||||
|
5.9.29.107 10.177.80.37 2 u 703 1024 377 205.704 160.406 449602.
|
||||||
|
91.189.94.4 10.177.80.37 2 u 673 1024 377 143.047 274.726 449445.
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
not authorized
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
[[inputs.ntpq]]
|
||||||
|
servers = ["serverA", "serverB", "serverC"]
|
||||||
Loading…
Reference in New Issue