feat(inputs.chrony): Allow to collect additional metrics (#14673)
This commit is contained in:
parent
f235fcc640
commit
7d90a52ed3
|
|
@ -31,6 +31,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
## Try to resolve received addresses to host-names via DNS lookups
|
||||
## Disabled by default to avoid DNS queries especially for slow DNS servers.
|
||||
# dns_lookup = false
|
||||
|
||||
## Metrics to query named according to chronyc commands
|
||||
## Available settings are:
|
||||
## activity -- number of peers online or offline
|
||||
## tracking -- information about system's clock performance
|
||||
## serverstats -- chronyd server statistics
|
||||
## sources -- extended information about peers
|
||||
## sourcestats -- statistics on peers
|
||||
# metrics = ["tracking"]
|
||||
```
|
||||
|
||||
## Metrics
|
||||
|
|
|
|||
|
|
@ -2,13 +2,14 @@
|
|||
package chrony
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
_ "embed"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
fbchrony "github.com/facebook/time/ntp/chrony"
|
||||
|
|
@ -25,10 +26,12 @@ type Chrony struct {
|
|||
Server string `toml:"server"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
DNSLookup bool `toml:"dns_lookup"`
|
||||
Metrics []string `toml:"metrics"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
conn net.Conn
|
||||
client *fbchrony.Client
|
||||
source string
|
||||
}
|
||||
|
||||
func (*Chrony) SampleConfig() string {
|
||||
|
|
@ -36,6 +39,7 @@ func (*Chrony) SampleConfig() string {
|
|||
}
|
||||
|
||||
func (c *Chrony) Init() error {
|
||||
// Use the configured server, if none set, we try to guess it in Start()
|
||||
if c.Server != "" {
|
||||
// Check the specified server address
|
||||
u, err := url.Parse(c.Server)
|
||||
|
|
@ -61,6 +65,19 @@ func (c *Chrony) Init() error {
|
|||
c.Server = u.String()
|
||||
}
|
||||
|
||||
// Check the given metrics
|
||||
if len(c.Metrics) == 0 {
|
||||
c.Metrics = append(c.Metrics, "tracking")
|
||||
}
|
||||
for _, m := range c.Metrics {
|
||||
switch m {
|
||||
case "activity", "tracking", "serverstats", "sources", "sourcestats":
|
||||
// Do nothing as those are valid
|
||||
default:
|
||||
return fmt.Errorf("invalid metric setting %q", m)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -78,12 +95,14 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error {
|
|||
return fmt.Errorf("dialing %q failed: %w", c.Server, err)
|
||||
}
|
||||
c.conn = conn
|
||||
c.source = u.Path
|
||||
case "udp":
|
||||
conn, err := net.DialTimeout("udp", u.Host, time.Duration(c.Timeout))
|
||||
if err != nil {
|
||||
return fmt.Errorf("dialing %q failed: %w", c.Server, err)
|
||||
}
|
||||
c.conn = conn
|
||||
c.source = u.Host
|
||||
}
|
||||
} else {
|
||||
// If no server is given, reproduce chronyc's behavior
|
||||
|
|
@ -112,26 +131,75 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error {
|
|||
|
||||
func (c *Chrony) Stop() {
|
||||
if c.conn != nil {
|
||||
if err := c.conn.Close(); err != nil {
|
||||
if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) {
|
||||
c.Log.Errorf("Closing connection to %q failed: %v", c.Server, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Chrony) Gather(acc telegraf.Accumulator) error {
|
||||
for _, m := range c.Metrics {
|
||||
switch m {
|
||||
case "activity":
|
||||
acc.AddError(c.gatherActivity(acc))
|
||||
case "tracking":
|
||||
acc.AddError(c.gatherTracking(acc))
|
||||
case "serverstats":
|
||||
acc.AddError(c.gatherServerStats(acc))
|
||||
case "sources":
|
||||
acc.AddError(c.gatherSources(acc))
|
||||
case "sourcestats":
|
||||
acc.AddError(c.gatherSourceStats(acc))
|
||||
default:
|
||||
return fmt.Errorf("invalid metric setting %q", m)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Chrony) gatherActivity(acc telegraf.Accumulator) error {
|
||||
req := fbchrony.NewActivityPacket()
|
||||
r, err := c.client.Communicate(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying activity data failed: %w", err)
|
||||
}
|
||||
resp, ok := r.(*fbchrony.ReplyActivity)
|
||||
if !ok {
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for activity data", r)
|
||||
}
|
||||
|
||||
tags := map[string]string{}
|
||||
if c.source != "" {
|
||||
tags["source"] = c.source
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"online": resp.Online,
|
||||
"offline": resp.Offline,
|
||||
"burst_online": resp.BurstOnline,
|
||||
"burst_offline": resp.BurstOffline,
|
||||
"unresolved": resp.Unresolved,
|
||||
}
|
||||
acc.AddFields("chrony_activity", fields, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Chrony) gatherTracking(acc telegraf.Accumulator) error {
|
||||
req := fbchrony.NewTrackingPacket()
|
||||
resp, err := c.client.Communicate(req)
|
||||
r, err := c.client.Communicate(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying tracking data failed: %w", err)
|
||||
}
|
||||
tracking, ok := resp.(*fbchrony.ReplyTracking)
|
||||
resp, ok := r.(*fbchrony.ReplyTracking)
|
||||
if !ok {
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for tracking data", resp)
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for tracking data", r)
|
||||
}
|
||||
|
||||
// according to https://github.com/mlichvar/chrony/blob/e11b518a1ffa704986fb1f1835c425844ba248ef/ntp.h#L70
|
||||
var leapStatus string
|
||||
switch tracking.LeapStatus {
|
||||
switch resp.LeapStatus {
|
||||
case 0:
|
||||
leapStatus = "normal"
|
||||
case 1:
|
||||
|
|
@ -144,24 +212,229 @@ func (c *Chrony) Gather(acc telegraf.Accumulator) error {
|
|||
|
||||
tags := map[string]string{
|
||||
"leap_status": leapStatus,
|
||||
"reference_id": strings.ToUpper(strconv.FormatUint(uint64(tracking.RefID), 16)),
|
||||
"stratum": strconv.FormatUint(uint64(tracking.Stratum), 10),
|
||||
"reference_id": fbchrony.RefidAsHEX(resp.RefID),
|
||||
"stratum": strconv.FormatUint(uint64(resp.Stratum), 10),
|
||||
}
|
||||
if c.source != "" {
|
||||
tags["source"] = c.source
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"frequency": tracking.FreqPPM,
|
||||
"system_time": tracking.CurrentCorrection,
|
||||
"last_offset": tracking.LastOffset,
|
||||
"residual_freq": tracking.ResidFreqPPM,
|
||||
"rms_offset": tracking.RMSOffset,
|
||||
"root_delay": tracking.RootDelay,
|
||||
"root_dispersion": tracking.RootDispersion,
|
||||
"skew": tracking.SkewPPM,
|
||||
"update_interval": tracking.LastUpdateInterval,
|
||||
"frequency": resp.FreqPPM,
|
||||
"system_time": resp.CurrentCorrection,
|
||||
"last_offset": resp.LastOffset,
|
||||
"residual_freq": resp.ResidFreqPPM,
|
||||
"rms_offset": resp.RMSOffset,
|
||||
"root_delay": resp.RootDelay,
|
||||
"root_dispersion": resp.RootDispersion,
|
||||
"skew": resp.SkewPPM,
|
||||
"update_interval": resp.LastUpdateInterval,
|
||||
}
|
||||
acc.AddFields("chrony", fields, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Chrony) gatherServerStats(acc telegraf.Accumulator) error {
|
||||
req := fbchrony.NewServerStatsPacket()
|
||||
r, err := c.client.Communicate(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying server statistics failed: %w", err)
|
||||
}
|
||||
|
||||
tags := map[string]string{}
|
||||
if c.source != "" {
|
||||
tags["source"] = c.source
|
||||
}
|
||||
|
||||
var fields map[string]interface{}
|
||||
switch resp := r.(type) {
|
||||
case *fbchrony.ReplyServerStats:
|
||||
fields = map[string]interface{}{
|
||||
"ntp_hits": resp.NTPHits,
|
||||
"ntp_drops": resp.NTPDrops,
|
||||
"cmd_hits": resp.CMDHits,
|
||||
"cmd_drops": resp.CMDDrops,
|
||||
"log_drops": resp.LogDrops,
|
||||
}
|
||||
case *fbchrony.ReplyServerStats2:
|
||||
fields = map[string]interface{}{
|
||||
"ntp_hits": resp.NTPHits,
|
||||
"ntp_drops": resp.NTPDrops,
|
||||
"ntp_auth_hits": resp.NTPAuthHits,
|
||||
"cmd_hits": resp.CMDHits,
|
||||
"cmd_drops": resp.CMDDrops,
|
||||
"log_drops": resp.LogDrops,
|
||||
"nke_hits": resp.NKEHits,
|
||||
"nke_drops": resp.NKEDrops,
|
||||
}
|
||||
case *fbchrony.ReplyServerStats3:
|
||||
fields = map[string]interface{}{
|
||||
"ntp_hits": resp.NTPHits,
|
||||
"ntp_drops": resp.NTPDrops,
|
||||
"ntp_auth_hits": resp.NTPAuthHits,
|
||||
"ntp_interleaved_hits": resp.NTPInterleavedHits,
|
||||
"ntp_timestamps": resp.NTPTimestamps,
|
||||
"ntp_span_seconds": resp.NTPSpanSeconds,
|
||||
"cmd_hits": resp.CMDHits,
|
||||
"cmd_drops": resp.CMDDrops,
|
||||
"log_drops": resp.LogDrops,
|
||||
"nke_hits": resp.NKEHits,
|
||||
"nke_drops": resp.NKEDrops,
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for server statistics", r)
|
||||
}
|
||||
|
||||
acc.AddFields("chrony_serverstats", fields, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Chrony) gatherSources(acc telegraf.Accumulator) error {
|
||||
sourcesReq := fbchrony.NewSourcesPacket()
|
||||
sourcesRaw, err := c.client.Communicate(sourcesReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying sources failed: %w", err)
|
||||
}
|
||||
|
||||
sourcesResp, ok := sourcesRaw.(*fbchrony.ReplySources)
|
||||
if !ok {
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for sources", sourcesRaw)
|
||||
}
|
||||
|
||||
for idx := int32(0); int(idx) < sourcesResp.NSources; idx++ {
|
||||
// Getting the source data
|
||||
sourceDataReq := fbchrony.NewSourceDataPacket(idx)
|
||||
sourceDataRaw, err := c.client.Communicate(sourceDataReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying data for source %d failed: %w", idx, err)
|
||||
}
|
||||
sourceData, ok := sourceDataRaw.(*fbchrony.ReplySourceData)
|
||||
if !ok {
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for source data", sourceDataRaw)
|
||||
}
|
||||
|
||||
// Trying to resolve the source name
|
||||
sourceNameReq := fbchrony.NewNTPSourceNamePacket(sourceData.IPAddr)
|
||||
sourceNameRaw, err := c.client.Communicate(sourceNameReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying name of source %d failed: %w", idx, err)
|
||||
}
|
||||
sourceName, ok := sourceNameRaw.(*fbchrony.ReplyNTPSourceName)
|
||||
if !ok {
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for source name", sourceNameRaw)
|
||||
}
|
||||
|
||||
// Cut the string at null termination
|
||||
var peer string
|
||||
if termidx := bytes.Index(sourceName.Name[:], []byte{0}); termidx >= 0 {
|
||||
peer = string(sourceName.Name[:termidx])
|
||||
} else {
|
||||
peer = string(sourceName.Name[:])
|
||||
}
|
||||
|
||||
if peer == "" {
|
||||
peer = sourceData.IPAddr.String()
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"peer": peer,
|
||||
}
|
||||
if c.source != "" {
|
||||
tags["source"] = c.source
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"index": idx,
|
||||
"ip": sourceData.IPAddr.String(),
|
||||
"poll": sourceData.Poll,
|
||||
"stratum": sourceData.Stratum,
|
||||
"state": sourceData.State.String(),
|
||||
"mode": sourceData.Mode.String(),
|
||||
"flags": sourceData.Flags,
|
||||
"reachability": sourceData.Reachability,
|
||||
"sample": sourceData.SinceSample,
|
||||
"latest_measurement": sourceData.LatestMeas,
|
||||
"latest_measurement_error": sourceData.LatestMeasErr,
|
||||
}
|
||||
acc.AddFields("chrony_sources", fields, tags)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Chrony) gatherSourceStats(acc telegraf.Accumulator) error {
|
||||
sourcesReq := fbchrony.NewSourcesPacket()
|
||||
sourcesRaw, err := c.client.Communicate(sourcesReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying sources failed: %w", err)
|
||||
}
|
||||
|
||||
sourcesResp, ok := sourcesRaw.(*fbchrony.ReplySources)
|
||||
if !ok {
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for sources", sourcesRaw)
|
||||
}
|
||||
|
||||
for idx := int32(0); int(idx) < sourcesResp.NSources; idx++ {
|
||||
// Getting the source data
|
||||
sourceStatsReq := fbchrony.NewSourceStatsPacket(idx)
|
||||
sourceStatsRaw, err := c.client.Communicate(sourceStatsReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying data for source %d failed: %w", idx, err)
|
||||
}
|
||||
sourceStats, ok := sourceStatsRaw.(*fbchrony.ReplySourceStats)
|
||||
if !ok {
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for source data", sourceStatsRaw)
|
||||
}
|
||||
|
||||
// Trying to resolve the source name
|
||||
sourceNameReq := fbchrony.NewNTPSourceNamePacket(sourceStats.IPAddr)
|
||||
sourceNameRaw, err := c.client.Communicate(sourceNameReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying name of source %d failed: %w", idx, err)
|
||||
}
|
||||
sourceName, ok := sourceNameRaw.(*fbchrony.ReplyNTPSourceName)
|
||||
if !ok {
|
||||
return fmt.Errorf("got unexpected response type %T while waiting for source name", sourceNameRaw)
|
||||
}
|
||||
|
||||
// Cut the string at null termination
|
||||
var peer string
|
||||
if termidx := bytes.Index(sourceName.Name[:], []byte{0}); termidx >= 0 {
|
||||
peer = string(sourceName.Name[:termidx])
|
||||
} else {
|
||||
peer = string(sourceName.Name[:])
|
||||
}
|
||||
|
||||
if peer == "" {
|
||||
peer = sourceStats.IPAddr.String()
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"reference_id": fbchrony.RefidAsHEX(sourceStats.RefID),
|
||||
"peer": peer,
|
||||
}
|
||||
if c.source != "" {
|
||||
tags["source"] = c.source
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"index": idx,
|
||||
"ip": sourceStats.IPAddr.String(),
|
||||
"samples": sourceStats.NSamples,
|
||||
"runs": sourceStats.NRuns,
|
||||
"span_seconds": sourceStats.SpanSeconds,
|
||||
"stddev": sourceStats.StandardDeviation,
|
||||
"residual_frequency": sourceStats.ResidFreqPPM,
|
||||
"skew": sourceStats.SkewPPM,
|
||||
"offset": sourceStats.EstimatedOffset,
|
||||
"offset_error": sourceStats.EstimatedOffsetErr,
|
||||
}
|
||||
acc.AddFields("chrony_sourcestats", fields, tags)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("chrony", func() telegraf.Input {
|
||||
return &Chrony{Timeout: config.Duration(3 * time.Second)}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,65 @@ import (
|
|||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
func TestGather(t *testing.T) {
|
||||
func TestGatherActivity(t *testing.T) {
|
||||
// Setup a mock server
|
||||
server := Server{
|
||||
ActivityInfo: &fbchrony.Activity{
|
||||
Online: 34,
|
||||
Offline: 6,
|
||||
BurstOnline: 2,
|
||||
BurstOffline: 0,
|
||||
Unresolved: 5,
|
||||
},
|
||||
}
|
||||
addr, err := server.Listen(t)
|
||||
require.NoError(t, err)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Setup the plugin
|
||||
plugin := &Chrony{
|
||||
Server: "udp://" + addr,
|
||||
Metrics: []string{"activity"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Start the plugin, do a gather and stop everything
|
||||
var acc testutil.Accumulator
|
||||
require.NoError(t, plugin.Start(&acc))
|
||||
defer plugin.Stop()
|
||||
require.NoError(t, plugin.Gather(&acc))
|
||||
plugin.Stop()
|
||||
server.Shutdown()
|
||||
|
||||
// Do the comparison
|
||||
expected := []telegraf.Metric{
|
||||
metric.New(
|
||||
"chrony_activity",
|
||||
map[string]string{"source": addr},
|
||||
map[string]interface{}{
|
||||
"online": 34,
|
||||
"offline": 6,
|
||||
"burst_online": 2,
|
||||
"burst_offline": 0,
|
||||
"unresolved": 5,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
options := []cmp.Option{
|
||||
// tests on linux with go1.20 will add a warning about code coverage, ignore that tag
|
||||
testutil.IgnoreTags("warning"),
|
||||
testutil.IgnoreTime(),
|
||||
cmpopts.EquateApprox(0.001, 0),
|
||||
}
|
||||
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
||||
}
|
||||
|
||||
func TestGatherTracking(t *testing.T) {
|
||||
// Setup a mock server
|
||||
server := Server{
|
||||
TrackingInfo: &fbchrony.Tracking{
|
||||
|
|
@ -46,8 +104,9 @@ func TestGather(t *testing.T) {
|
|||
|
||||
// Setup the plugin
|
||||
plugin := &Chrony{
|
||||
Server: "udp://" + addr,
|
||||
Log: testutil.Logger{},
|
||||
Server: "udp://" + addr,
|
||||
Metrics: []string{"tracking"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
|
|
@ -64,6 +123,7 @@ func TestGather(t *testing.T) {
|
|||
metric.New(
|
||||
"chrony",
|
||||
map[string]string{
|
||||
"source": addr,
|
||||
"reference_id": "A29FC87B",
|
||||
"leap_status": "not synchronized",
|
||||
"stratum": "3",
|
||||
|
|
@ -94,6 +154,499 @@ func TestGather(t *testing.T) {
|
|||
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
||||
}
|
||||
|
||||
func TestGatherServerStats(t *testing.T) {
|
||||
// Setup a mock server
|
||||
server := Server{
|
||||
ServerStatInfo: &fbchrony.ServerStats{
|
||||
NTPHits: 2542,
|
||||
CMDHits: 112,
|
||||
NTPDrops: 42,
|
||||
CMDDrops: 8,
|
||||
LogDrops: 0,
|
||||
},
|
||||
}
|
||||
addr, err := server.Listen(t)
|
||||
require.NoError(t, err)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Setup the plugin
|
||||
plugin := &Chrony{
|
||||
Server: "udp://" + addr,
|
||||
Metrics: []string{"serverstats"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Start the plugin, do a gather and stop everything
|
||||
var acc testutil.Accumulator
|
||||
require.NoError(t, plugin.Start(&acc))
|
||||
defer plugin.Stop()
|
||||
require.NoError(t, plugin.Gather(&acc))
|
||||
plugin.Stop()
|
||||
server.Shutdown()
|
||||
|
||||
// Do the comparison
|
||||
expected := []telegraf.Metric{
|
||||
metric.New(
|
||||
"chrony_serverstats",
|
||||
map[string]string{"source": addr},
|
||||
map[string]interface{}{
|
||||
"ntp_hits": uint64(2542),
|
||||
"ntp_drops": uint64(42),
|
||||
"cmd_hits": uint64(112),
|
||||
"cmd_drops": uint64(8),
|
||||
"log_drops": uint64(0),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
options := []cmp.Option{
|
||||
// tests on linux with go1.20 will add a warning about code coverage, ignore that tag
|
||||
testutil.IgnoreTags("warning"),
|
||||
testutil.IgnoreTime(),
|
||||
cmpopts.EquateApprox(0.001, 0),
|
||||
}
|
||||
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
||||
}
|
||||
|
||||
func TestGatherServerStats2(t *testing.T) {
|
||||
// Setup a mock server
|
||||
server := Server{
|
||||
ServerStatInfo: &fbchrony.ServerStats2{
|
||||
NTPHits: 2542,
|
||||
NKEHits: 5,
|
||||
CMDHits: 112,
|
||||
NTPDrops: 42,
|
||||
NKEDrops: 1,
|
||||
CMDDrops: 8,
|
||||
LogDrops: 0,
|
||||
NTPAuthHits: 9,
|
||||
},
|
||||
}
|
||||
addr, err := server.Listen(t)
|
||||
require.NoError(t, err)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Setup the plugin
|
||||
plugin := &Chrony{
|
||||
Server: "udp://" + addr,
|
||||
Metrics: []string{"serverstats"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Start the plugin, do a gather and stop everything
|
||||
var acc testutil.Accumulator
|
||||
require.NoError(t, plugin.Start(&acc))
|
||||
defer plugin.Stop()
|
||||
require.NoError(t, plugin.Gather(&acc))
|
||||
plugin.Stop()
|
||||
server.Shutdown()
|
||||
|
||||
// Do the comparison
|
||||
expected := []telegraf.Metric{
|
||||
metric.New(
|
||||
"chrony_serverstats",
|
||||
map[string]string{"source": addr},
|
||||
map[string]interface{}{
|
||||
"ntp_hits": uint64(2542),
|
||||
"ntp_drops": uint64(42),
|
||||
"ntp_auth_hits": uint64(9),
|
||||
"cmd_hits": uint64(112),
|
||||
"cmd_drops": uint64(8),
|
||||
"log_drops": uint64(0),
|
||||
"nke_hits": uint64(5),
|
||||
"nke_drops": uint64(1),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
options := []cmp.Option{
|
||||
// tests on linux with go1.20 will add a warning about code coverage, ignore that tag
|
||||
testutil.IgnoreTags("warning"),
|
||||
testutil.IgnoreTime(),
|
||||
cmpopts.EquateApprox(0.001, 0),
|
||||
}
|
||||
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
||||
}
|
||||
|
||||
func TestGatherServerStats3(t *testing.T) {
|
||||
// Setup a mock server
|
||||
server := Server{
|
||||
ServerStatInfo: &fbchrony.ServerStats3{
|
||||
NTPHits: 2542,
|
||||
NKEHits: 5,
|
||||
CMDHits: 112,
|
||||
NTPDrops: 42,
|
||||
NKEDrops: 1,
|
||||
CMDDrops: 8,
|
||||
LogDrops: 0,
|
||||
NTPAuthHits: 9,
|
||||
NTPInterleavedHits: 28,
|
||||
NTPTimestamps: 69527,
|
||||
NTPSpanSeconds: 33,
|
||||
},
|
||||
}
|
||||
addr, err := server.Listen(t)
|
||||
require.NoError(t, err)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Setup the plugin
|
||||
plugin := &Chrony{
|
||||
Server: "udp://" + addr,
|
||||
Metrics: []string{"serverstats"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Start the plugin, do a gather and stop everything
|
||||
var acc testutil.Accumulator
|
||||
require.NoError(t, plugin.Start(&acc))
|
||||
defer plugin.Stop()
|
||||
require.NoError(t, plugin.Gather(&acc))
|
||||
plugin.Stop()
|
||||
server.Shutdown()
|
||||
|
||||
// Do the comparison
|
||||
expected := []telegraf.Metric{
|
||||
metric.New(
|
||||
"chrony_serverstats",
|
||||
map[string]string{"source": addr},
|
||||
map[string]interface{}{
|
||||
"ntp_hits": uint64(2542),
|
||||
"ntp_drops": uint64(42),
|
||||
"ntp_auth_hits": uint64(9),
|
||||
"ntp_interleaved_hits": uint64(28),
|
||||
"ntp_timestamps": uint64(69527),
|
||||
"ntp_span_seconds": uint64(33),
|
||||
"cmd_hits": uint64(112),
|
||||
"cmd_drops": uint64(8),
|
||||
"log_drops": uint64(0),
|
||||
"nke_hits": uint64(5),
|
||||
"nke_drops": uint64(1),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
options := []cmp.Option{
|
||||
// tests on linux with go1.20 will add a warning about code coverage, ignore that tag
|
||||
testutil.IgnoreTags("warning"),
|
||||
testutil.IgnoreTime(),
|
||||
cmpopts.EquateApprox(0.001, 0),
|
||||
}
|
||||
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
||||
}
|
||||
|
||||
func TestGatherSources(t *testing.T) {
|
||||
// Setup a mock server
|
||||
server := Server{
|
||||
SourcesInfo: []source{
|
||||
{
|
||||
name: "ntp1.my.org",
|
||||
data: &fbchrony.SourceData{
|
||||
IPAddr: net.IPv4(192, 168, 0, 1),
|
||||
Poll: 64,
|
||||
Stratum: 16,
|
||||
State: fbchrony.SourceStateSync,
|
||||
Mode: fbchrony.SourceModePeer,
|
||||
Flags: 0,
|
||||
Reachability: 0,
|
||||
SinceSample: 0,
|
||||
OrigLatestMeas: 1.22354,
|
||||
LatestMeas: 1.22354,
|
||||
LatestMeasErr: 0.00423,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ntp2.my.org",
|
||||
data: &fbchrony.SourceData{
|
||||
IPAddr: net.IPv4(192, 168, 0, 2),
|
||||
Poll: 64,
|
||||
Stratum: 16,
|
||||
State: fbchrony.SourceStateSync,
|
||||
Mode: fbchrony.SourceModePeer,
|
||||
Flags: 0,
|
||||
Reachability: 0,
|
||||
SinceSample: 0,
|
||||
OrigLatestMeas: 0.17791,
|
||||
LatestMeas: 0.35445,
|
||||
LatestMeasErr: 0.01196,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ntp3.my.org",
|
||||
data: &fbchrony.SourceData{
|
||||
IPAddr: net.IPv4(192, 168, 0, 3),
|
||||
Poll: 512,
|
||||
Stratum: 1,
|
||||
State: fbchrony.SourceStateOutlier,
|
||||
Mode: fbchrony.SourceModePeer,
|
||||
Flags: 0,
|
||||
Reachability: 512,
|
||||
SinceSample: 377,
|
||||
OrigLatestMeas: 7.21158,
|
||||
LatestMeas: 7.21158,
|
||||
LatestMeasErr: 2.15453,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
addr, err := server.Listen(t)
|
||||
require.NoError(t, err)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Setup the plugin
|
||||
plugin := &Chrony{
|
||||
Server: "udp://" + addr,
|
||||
Metrics: []string{"sources"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Start the plugin, do a gather and stop everything
|
||||
var acc testutil.Accumulator
|
||||
require.NoError(t, plugin.Start(&acc))
|
||||
defer plugin.Stop()
|
||||
require.NoError(t, plugin.Gather(&acc))
|
||||
plugin.Stop()
|
||||
server.Shutdown()
|
||||
|
||||
// Do the comparison
|
||||
expected := []telegraf.Metric{
|
||||
metric.New(
|
||||
"chrony_sources",
|
||||
map[string]string{
|
||||
"source": addr,
|
||||
"peer": "ntp1.my.org",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"index": 0,
|
||||
"ip": "192.168.0.1",
|
||||
"poll": 64,
|
||||
"stratum": uint64(16),
|
||||
"state": "sync",
|
||||
"mode": "peer",
|
||||
"flags": uint64(0),
|
||||
"reachability": uint64(0),
|
||||
"sample": uint64(0),
|
||||
"latest_measurement": 1.22354,
|
||||
"latest_measurement_error": 0.00423,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
metric.New(
|
||||
"chrony_sources",
|
||||
map[string]string{
|
||||
"source": addr,
|
||||
"peer": "ntp2.my.org",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"index": 1,
|
||||
"ip": "192.168.0.2",
|
||||
"poll": 64,
|
||||
"stratum": uint64(16),
|
||||
"state": "sync",
|
||||
"mode": "peer",
|
||||
"flags": uint64(0),
|
||||
"reachability": uint64(0),
|
||||
"sample": uint64(0),
|
||||
"latest_measurement": 0.35445,
|
||||
"latest_measurement_error": 0.01196,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
metric.New(
|
||||
"chrony_sources",
|
||||
map[string]string{
|
||||
"source": addr,
|
||||
"peer": "ntp3.my.org",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"index": 2,
|
||||
"ip": "192.168.0.3",
|
||||
"poll": 512,
|
||||
"stratum": uint64(1),
|
||||
"state": "outlier",
|
||||
"mode": "peer",
|
||||
"flags": uint64(0),
|
||||
"reachability": uint64(512),
|
||||
"sample": uint64(377),
|
||||
"latest_measurement": 7.21158,
|
||||
"latest_measurement_error": 2.15453,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
options := []cmp.Option{
|
||||
// tests on linux with go1.20 will add a warning about code coverage, ignore that tag
|
||||
testutil.IgnoreTags("warning"),
|
||||
testutil.IgnoreTime(),
|
||||
cmpopts.EquateApprox(0.001, 0),
|
||||
}
|
||||
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
||||
}
|
||||
|
||||
func TestGatherSourceStats(t *testing.T) {
|
||||
// Setup a mock server
|
||||
server := Server{
|
||||
SourcesInfo: []source{
|
||||
{
|
||||
name: "ntp1.my.org",
|
||||
stats: &fbchrony.SourceStats{
|
||||
RefID: 434354566,
|
||||
IPAddr: net.IPv4(192, 168, 0, 1),
|
||||
NSamples: 1254,
|
||||
NRuns: 16,
|
||||
SpanSeconds: 32,
|
||||
StandardDeviation: 0.0244,
|
||||
ResidFreqPPM: 0.0015,
|
||||
SkewPPM: 0.0001,
|
||||
EstimatedOffset: 0.0039,
|
||||
EstimatedOffsetErr: 0.0007,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ntp2.my.org",
|
||||
stats: &fbchrony.SourceStats{
|
||||
RefID: 70349595,
|
||||
IPAddr: net.IPv4(192, 168, 0, 2),
|
||||
NSamples: 23135,
|
||||
NRuns: 24,
|
||||
SpanSeconds: 3,
|
||||
StandardDeviation: 0.0099,
|
||||
ResidFreqPPM: 0.0188,
|
||||
SkewPPM: 0.0002,
|
||||
EstimatedOffset: 0.0104,
|
||||
EstimatedOffsetErr: 0.0021,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ntp3.my.org",
|
||||
stats: &fbchrony.SourceStats{
|
||||
RefID: 983490438,
|
||||
IPAddr: net.IPv4(192, 168, 0, 3),
|
||||
NSamples: 23,
|
||||
NRuns: 4,
|
||||
SpanSeconds: 193,
|
||||
StandardDeviation: 7.0586,
|
||||
ResidFreqPPM: 0.8320,
|
||||
SkewPPM: 0.0332,
|
||||
EstimatedOffset: 5.3345,
|
||||
EstimatedOffsetErr: 1.5437,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
addr, err := server.Listen(t)
|
||||
require.NoError(t, err)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Setup the plugin
|
||||
plugin := &Chrony{
|
||||
Server: "udp://" + addr,
|
||||
Metrics: []string{"sourcestats"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Start the plugin, do a gather and stop everything
|
||||
var acc testutil.Accumulator
|
||||
require.NoError(t, plugin.Start(&acc))
|
||||
defer plugin.Stop()
|
||||
require.NoError(t, plugin.Gather(&acc))
|
||||
plugin.Stop()
|
||||
server.Shutdown()
|
||||
|
||||
// Do the comparison
|
||||
expected := []telegraf.Metric{
|
||||
metric.New(
|
||||
"chrony_sourcestats",
|
||||
map[string]string{
|
||||
"source": addr,
|
||||
"peer": "ntp1.my.org",
|
||||
"reference_id": "19E3B986",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"index": 0,
|
||||
"ip": "192.168.0.1",
|
||||
"samples": uint64(1254),
|
||||
"runs": uint64(16),
|
||||
"span_seconds": uint64(32),
|
||||
"stddev": 0.0244,
|
||||
"residual_frequency": 0.0015,
|
||||
"skew": 0.0001,
|
||||
"offset": 0.0039,
|
||||
"offset_error": 0.0007,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
metric.New(
|
||||
"chrony_sourcestats",
|
||||
map[string]string{
|
||||
"source": addr,
|
||||
"peer": "ntp2.my.org",
|
||||
"reference_id": "0431731B",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"index": 1,
|
||||
"ip": "192.168.0.2",
|
||||
"samples": uint64(23135),
|
||||
"runs": uint64(24),
|
||||
"span_seconds": uint64(3),
|
||||
"stddev": 0.0099,
|
||||
"residual_frequency": 0.0188,
|
||||
"skew": 0.0002,
|
||||
"offset": 0.0104,
|
||||
"offset_error": 0.0021,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
metric.New(
|
||||
"chrony_sourcestats",
|
||||
map[string]string{
|
||||
"source": addr,
|
||||
"peer": "ntp3.my.org",
|
||||
"reference_id": "3A9EDF86",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"index": 2,
|
||||
"ip": "192.168.0.3",
|
||||
"samples": uint64(23),
|
||||
"runs": uint64(4),
|
||||
"span_seconds": uint64(193),
|
||||
"stddev": 7.0586,
|
||||
"residual_frequency": 0.8320,
|
||||
"skew": 0.0332,
|
||||
"offset": 5.3345,
|
||||
"offset_error": 1.5437,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
options := []cmp.Option{
|
||||
// tests on linux with go1.20 will add a warning about code coverage, ignore that tag
|
||||
testutil.IgnoreTags("warning"),
|
||||
testutil.IgnoreTime(),
|
||||
cmpopts.EquateApprox(0.001, 0),
|
||||
}
|
||||
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
||||
}
|
||||
|
||||
func TestIntegration(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
|
|
@ -112,10 +665,11 @@ func TestIntegration(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, container.Start(), "failed to start container")
|
||||
defer container.Terminate()
|
||||
addr := container.Address + ":" + container.Ports["323"]
|
||||
|
||||
// Setup the plugin
|
||||
plugin := &Chrony{
|
||||
Server: "udp://" + container.Address + ":" + container.Ports["323"],
|
||||
Server: "udp://" + addr,
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
|
@ -131,6 +685,7 @@ func TestIntegration(t *testing.T) {
|
|||
metric.New(
|
||||
"chrony",
|
||||
map[string]string{
|
||||
"source": addr,
|
||||
"leap_status": "normal",
|
||||
"reference_id": "A29FC87B",
|
||||
"stratum": "4",
|
||||
|
|
@ -159,8 +714,17 @@ func TestIntegration(t *testing.T) {
|
|||
testutil.RequireMetricsStructureEqual(t, expected, actual, options...)
|
||||
}
|
||||
|
||||
type source struct {
|
||||
name string
|
||||
data *fbchrony.SourceData
|
||||
stats *fbchrony.SourceStats
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
TrackingInfo *fbchrony.Tracking
|
||||
ActivityInfo *fbchrony.Activity
|
||||
TrackingInfo *fbchrony.Tracking
|
||||
ServerStatInfo interface{}
|
||||
SourcesInfo []source
|
||||
|
||||
conn net.PacketConn
|
||||
}
|
||||
|
|
@ -203,7 +767,24 @@ func (s *Server) serve(t *testing.T) {
|
|||
}
|
||||
seqno := header.Sequence + 1
|
||||
|
||||
t.Logf("mock server: received request %d", header.Command)
|
||||
switch header.Command {
|
||||
case 14: // sources
|
||||
_, err := s.conn.WriteTo(s.encodeSourcesReply(seqno), addr)
|
||||
if err != nil {
|
||||
t.Logf("mock server [sources]: writing reply failed: %v", err)
|
||||
} else {
|
||||
t.Log("mock server [sources]: successfully wrote reply")
|
||||
}
|
||||
case 15: // source data
|
||||
var idx int32
|
||||
require.NoError(t, binary.Read(data, binary.BigEndian, &idx))
|
||||
_, err = s.conn.WriteTo(s.encodeSourceDataReply(seqno, idx), addr)
|
||||
if err != nil {
|
||||
t.Logf("mock server [source data]: writing reply failed: %v", err)
|
||||
} else {
|
||||
t.Log("mock server [source data]: successfully wrote reply")
|
||||
}
|
||||
case 33: // tracking
|
||||
_, err := s.conn.WriteTo(s.encodeTrackingReply(seqno), addr)
|
||||
if err != nil {
|
||||
|
|
@ -211,40 +792,67 @@ func (s *Server) serve(t *testing.T) {
|
|||
} else {
|
||||
t.Log("mock server [tracking]: successfully wrote reply")
|
||||
}
|
||||
case 34: // source stats
|
||||
var idx int32
|
||||
require.NoError(t, binary.Read(data, binary.BigEndian, &idx))
|
||||
_, err = s.conn.WriteTo(s.encodeSourceStatsReply(seqno, idx), addr)
|
||||
if err != nil {
|
||||
t.Logf("mock server [source stats]: writing reply failed: %v", err)
|
||||
} else {
|
||||
t.Log("mock server [source stats]: successfully wrote reply")
|
||||
}
|
||||
case 44: // activity
|
||||
_, err := s.conn.WriteTo(s.encodeActivityReply(seqno), addr)
|
||||
if err != nil {
|
||||
t.Logf("mock server [activity]: writing reply failed: %v", err)
|
||||
} else {
|
||||
t.Log("mock server [activity]: successfully wrote reply")
|
||||
}
|
||||
case 54: // server stats
|
||||
_, err := s.conn.WriteTo(s.encodeServerStatsReply(seqno), addr)
|
||||
if err != nil {
|
||||
t.Logf("mock server [serverstats]: writing reply failed: %v", err)
|
||||
} else {
|
||||
t.Log("mock server [serverstats]: successfully wrote reply")
|
||||
}
|
||||
case 65: // source name
|
||||
buf := make([]byte, 20)
|
||||
_, err := data.Read(buf)
|
||||
require.NoError(t, err)
|
||||
ip := decodeIP(buf)
|
||||
t.Logf("mock server [source name]: resolving %v", ip)
|
||||
_, err = s.conn.WriteTo(s.encodeSourceNameReply(seqno, ip), addr)
|
||||
if err != nil {
|
||||
t.Logf("mock server [source name]: writing reply failed: %v", err)
|
||||
} else {
|
||||
t.Log("mock server [source name]: successfully wrote reply")
|
||||
}
|
||||
default:
|
||||
t.Logf("mock server: unhandled command %v", header.Command)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) encodeActivityReply(sequence uint32) []byte {
|
||||
// Encode the header
|
||||
buf := encodeHeader(44, 12, 0, sequence) // activity request
|
||||
|
||||
// Encode data
|
||||
b := bytes.NewBuffer(buf)
|
||||
_ = binary.Write(b, binary.BigEndian, s.ActivityInfo)
|
||||
|
||||
return b.Bytes()
|
||||
}
|
||||
|
||||
func (s *Server) encodeTrackingReply(sequence uint32) []byte {
|
||||
t := s.TrackingInfo
|
||||
|
||||
// Encode the header
|
||||
buf := []byte{
|
||||
0x06, // version 6
|
||||
0x02, // packet type 2: tracking
|
||||
0x00, // res1
|
||||
0x00, // res2
|
||||
0x00, 0x21, // command 33: tracking request
|
||||
0x00, 0x05, // reply 5: tracking reply
|
||||
0x00, 0x00, // status 0: success
|
||||
0x00, 0x00, // pad1
|
||||
0x00, 0x00, // pad2
|
||||
0x00, 0x00, // pad3
|
||||
}
|
||||
buf = binary.BigEndian.AppendUint32(buf, sequence) // sequence number
|
||||
buf = append(buf, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) // pad 4 & 5
|
||||
buf := encodeHeader(33, 5, 0, sequence) // tracking request
|
||||
|
||||
// Encode data
|
||||
buf = binary.BigEndian.AppendUint32(buf, t.RefID)
|
||||
buf = append(buf, t.IPAddr.To16()...)
|
||||
if len(t.IPAddr) == 4 {
|
||||
buf = append(buf, 0x00, 0x01) // IPv4 address family
|
||||
} else {
|
||||
buf = append(buf, 0x00, 0x02) // IPv6 address family
|
||||
}
|
||||
buf = append(buf, 0x00, 0x00) // padding
|
||||
buf = append(buf, encodeIP(t.IPAddr)...)
|
||||
buf = binary.BigEndian.AppendUint16(buf, t.Stratum)
|
||||
buf = binary.BigEndian.AppendUint16(buf, t.LeapStatus)
|
||||
sec := uint64(t.RefTime.Unix())
|
||||
|
|
@ -265,6 +873,167 @@ func (s *Server) encodeTrackingReply(sequence uint32) []byte {
|
|||
return buf
|
||||
}
|
||||
|
||||
func (s *Server) encodeServerStatsReply(sequence uint32) []byte {
|
||||
var b *bytes.Buffer
|
||||
switch info := s.ServerStatInfo.(type) {
|
||||
case *fbchrony.ServerStats:
|
||||
// Encode the header
|
||||
buf := encodeHeader(54, 14, 0, sequence) // activity request
|
||||
|
||||
// Encode data
|
||||
b = bytes.NewBuffer(buf)
|
||||
_ = binary.Write(b, binary.BigEndian, info)
|
||||
case *fbchrony.ServerStats2:
|
||||
// Encode the header
|
||||
buf := encodeHeader(54, 22, 0, sequence) // activity request
|
||||
|
||||
// Encode data
|
||||
b = bytes.NewBuffer(buf)
|
||||
_ = binary.Write(b, binary.BigEndian, info)
|
||||
case *fbchrony.ServerStats3:
|
||||
// Encode the header
|
||||
buf := encodeHeader(54, 24, 0, sequence) // activity request
|
||||
|
||||
// Encode data
|
||||
b = bytes.NewBuffer(buf)
|
||||
_ = binary.Write(b, binary.BigEndian, info)
|
||||
}
|
||||
|
||||
return b.Bytes()
|
||||
}
|
||||
|
||||
func (s *Server) encodeSourcesReply(sequence uint32) []byte {
|
||||
// Encode the header
|
||||
buf := encodeHeader(14, 2, 0, sequence) // sources request
|
||||
|
||||
// Encode data
|
||||
buf = binary.BigEndian.AppendUint32(buf, uint32(len(s.SourcesInfo))) // NSources
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
func (s *Server) encodeSourceDataReply(sequence uint32, idx int32) []byte {
|
||||
if len(s.SourcesInfo) <= int(idx) {
|
||||
return encodeHeader(15, 3, 3, sequence) // status invalid
|
||||
}
|
||||
src := s.SourcesInfo[idx].data
|
||||
|
||||
// Encode the header
|
||||
buf := encodeHeader(15, 3, 0, sequence) // source data request
|
||||
|
||||
// Encode data
|
||||
buf = append(buf, encodeIP(src.IPAddr)...)
|
||||
buf = binary.BigEndian.AppendUint16(buf, uint16(src.Poll))
|
||||
buf = binary.BigEndian.AppendUint16(buf, src.Stratum)
|
||||
buf = binary.BigEndian.AppendUint16(buf, uint16(src.State))
|
||||
buf = binary.BigEndian.AppendUint16(buf, uint16(src.Mode))
|
||||
buf = binary.BigEndian.AppendUint16(buf, src.Flags)
|
||||
buf = binary.BigEndian.AppendUint16(buf, src.Reachability)
|
||||
buf = binary.BigEndian.AppendUint32(buf, src.SinceSample)
|
||||
buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.OrigLatestMeas))
|
||||
buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.LatestMeas))
|
||||
buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.LatestMeasErr))
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
func (s *Server) encodeSourceStatsReply(sequence uint32, idx int32) []byte {
|
||||
if len(s.SourcesInfo) <= int(idx) {
|
||||
return encodeHeader(34, 6, 3, sequence) // status invalid
|
||||
}
|
||||
src := s.SourcesInfo[idx].stats
|
||||
|
||||
// Encode the header
|
||||
buf := encodeHeader(15, 6, 0, sequence) // source data request
|
||||
|
||||
// Encode data
|
||||
buf = binary.BigEndian.AppendUint32(buf, src.RefID)
|
||||
buf = append(buf, encodeIP(src.IPAddr)...)
|
||||
buf = binary.BigEndian.AppendUint32(buf, src.NSamples)
|
||||
buf = binary.BigEndian.AppendUint32(buf, src.NRuns)
|
||||
buf = binary.BigEndian.AppendUint32(buf, src.SpanSeconds)
|
||||
buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.StandardDeviation))
|
||||
buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.ResidFreqPPM))
|
||||
buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.SkewPPM))
|
||||
buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.EstimatedOffset))
|
||||
buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.EstimatedOffsetErr))
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
func (s *Server) encodeSourceNameReply(sequence uint32, ip net.IP) []byte {
|
||||
// Encode the header
|
||||
buf := encodeHeader(65, 19, 0, sequence) // source name request
|
||||
|
||||
// Find the correct source
|
||||
var name []byte
|
||||
for _, src := range s.SourcesInfo {
|
||||
if src.data != nil && src.data.IPAddr.Equal(ip) || src.stats != nil && src.stats.IPAddr.Equal(ip) {
|
||||
name = []byte(src.name)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Encode data
|
||||
if len(name) > 256 {
|
||||
buf = append(buf, name[:256]...)
|
||||
} else {
|
||||
buf = append(buf, name...)
|
||||
buf = append(buf, make([]byte, 256-len(name))...)
|
||||
}
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
func encodeHeader(command, replyType, status uint16, seqnr uint32) []byte {
|
||||
buf := []byte{
|
||||
0x06, // version 6
|
||||
0x02, // packet type 2: reply
|
||||
0x00, // res1
|
||||
0x00, // res2
|
||||
}
|
||||
buf = binary.BigEndian.AppendUint16(buf, command) // command
|
||||
buf = binary.BigEndian.AppendUint16(buf, replyType) // reply type
|
||||
buf = binary.BigEndian.AppendUint16(buf, status) // status 0: success
|
||||
buf = append(buf, []byte{
|
||||
0x00, 0x00, // pad1
|
||||
0x00, 0x00, // pad2
|
||||
0x00, 0x00, // pad3
|
||||
}...)
|
||||
buf = binary.BigEndian.AppendUint32(buf, seqnr) // sequence number
|
||||
buf = append(buf, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) // pad 4 & 5
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
func encodeIP(addr net.IP) []byte {
|
||||
var buf []byte
|
||||
|
||||
buf = append(buf, addr.To16()...)
|
||||
if len(addr) == 4 {
|
||||
buf = append(buf, 0x00, 0x01) // IPv4 address family
|
||||
} else {
|
||||
buf = append(buf, 0x00, 0x02) // IPv6 address family
|
||||
}
|
||||
buf = append(buf, 0x00, 0x00) // padding
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
func decodeIP(buf []byte) net.IP {
|
||||
if len(buf) != 20 {
|
||||
panic("invalid length for IP")
|
||||
}
|
||||
|
||||
addr := net.IP(buf[0:16])
|
||||
family := binary.BigEndian.Uint16(buf[16:18])
|
||||
if family == 1 {
|
||||
return addr.To4()
|
||||
}
|
||||
|
||||
return addr
|
||||
}
|
||||
|
||||
// Modified based on https://github.com/mlichvar/chrony/blob/master/util.c
|
||||
const (
|
||||
floatExpBits = int32(7)
|
||||
|
|
|
|||
|
|
@ -12,3 +12,12 @@
|
|||
## Try to resolve received addresses to host-names via DNS lookups
|
||||
## Disabled by default to avoid DNS queries especially for slow DNS servers.
|
||||
# dns_lookup = false
|
||||
|
||||
## Metrics to query named according to chronyc commands
|
||||
## Available settings are:
|
||||
## activity -- number of peers online or offline
|
||||
## tracking -- information about system's clock performance
|
||||
## serverstats -- chronyd server statistics
|
||||
## sources -- extended information about peers
|
||||
## sourcestats -- statistics on peers
|
||||
# metrics = ["tracking"]
|
||||
|
|
|
|||
Loading…
Reference in New Issue