diff --git a/plugins/inputs/chrony/README.md b/plugins/inputs/chrony/README.md index 38644c497..8ad6767bb 100644 --- a/plugins/inputs/chrony/README.md +++ b/plugins/inputs/chrony/README.md @@ -31,6 +31,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Try to resolve received addresses to host-names via DNS lookups ## Disabled by default to avoid DNS queries especially for slow DNS servers. # dns_lookup = false + + ## Metrics to query named according to chronyc commands + ## Available settings are: + ## activity -- number of peers online or offline + ## tracking -- information about system's clock performance + ## serverstats -- chronyd server statistics + ## sources -- extended information about peers + ## sourcestats -- statistics on peers + # metrics = ["tracking"] ``` ## Metrics diff --git a/plugins/inputs/chrony/chrony.go b/plugins/inputs/chrony/chrony.go index 551eda3cb..df0ac440f 100644 --- a/plugins/inputs/chrony/chrony.go +++ b/plugins/inputs/chrony/chrony.go @@ -2,13 +2,14 @@ package chrony import ( + "bytes" _ "embed" "errors" "fmt" "net" "net/url" "strconv" - "strings" + "syscall" "time" fbchrony "github.com/facebook/time/ntp/chrony" @@ -25,10 +26,12 @@ type Chrony struct { Server string `toml:"server"` Timeout config.Duration `toml:"timeout"` DNSLookup bool `toml:"dns_lookup"` + Metrics []string `toml:"metrics"` Log telegraf.Logger `toml:"-"` conn net.Conn client *fbchrony.Client + source string } func (*Chrony) SampleConfig() string { @@ -36,6 +39,7 @@ func (*Chrony) SampleConfig() string { } func (c *Chrony) Init() error { + // Use the configured server, if none set, we try to guess it in Start() if c.Server != "" { // Check the specified server address u, err := url.Parse(c.Server) @@ -61,6 +65,19 @@ func (c *Chrony) Init() error { c.Server = u.String() } + // Check the given metrics + if len(c.Metrics) == 0 { + c.Metrics = append(c.Metrics, "tracking") + } + for _, m := range c.Metrics { + switch m { + case "activity", "tracking", "serverstats", "sources", "sourcestats": + // Do nothing as those are valid + default: + return fmt.Errorf("invalid metric setting %q", m) + } + } + return nil } @@ -78,12 +95,14 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error { return fmt.Errorf("dialing %q failed: %w", c.Server, err) } c.conn = conn + c.source = u.Path case "udp": conn, err := net.DialTimeout("udp", u.Host, time.Duration(c.Timeout)) if err != nil { return fmt.Errorf("dialing %q failed: %w", c.Server, err) } c.conn = conn + c.source = u.Host } } else { // If no server is given, reproduce chronyc's behavior @@ -112,26 +131,75 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error { func (c *Chrony) Stop() { if c.conn != nil { - if err := c.conn.Close(); err != nil { + if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) { c.Log.Errorf("Closing connection to %q failed: %v", c.Server, err) } } } func (c *Chrony) Gather(acc telegraf.Accumulator) error { + for _, m := range c.Metrics { + switch m { + case "activity": + acc.AddError(c.gatherActivity(acc)) + case "tracking": + acc.AddError(c.gatherTracking(acc)) + case "serverstats": + acc.AddError(c.gatherServerStats(acc)) + case "sources": + acc.AddError(c.gatherSources(acc)) + case "sourcestats": + acc.AddError(c.gatherSourceStats(acc)) + default: + return fmt.Errorf("invalid metric setting %q", m) + } + } + + return nil +} + +func (c *Chrony) gatherActivity(acc telegraf.Accumulator) error { + req := fbchrony.NewActivityPacket() + r, err := c.client.Communicate(req) + if err != nil { + return fmt.Errorf("querying activity data failed: %w", err) + } + resp, ok := r.(*fbchrony.ReplyActivity) + if !ok { + return fmt.Errorf("got unexpected response type %T while waiting for activity data", r) + } + + tags := map[string]string{} + if c.source != "" { + tags["source"] = c.source + } + + fields := map[string]interface{}{ + "online": resp.Online, + "offline": resp.Offline, + "burst_online": resp.BurstOnline, + "burst_offline": resp.BurstOffline, + "unresolved": resp.Unresolved, + } + acc.AddFields("chrony_activity", fields, tags) + + return nil +} + +func (c *Chrony) gatherTracking(acc telegraf.Accumulator) error { req := fbchrony.NewTrackingPacket() - resp, err := c.client.Communicate(req) + r, err := c.client.Communicate(req) if err != nil { return fmt.Errorf("querying tracking data failed: %w", err) } - tracking, ok := resp.(*fbchrony.ReplyTracking) + resp, ok := r.(*fbchrony.ReplyTracking) if !ok { - return fmt.Errorf("got unexpected response type %T while waiting for tracking data", resp) + return fmt.Errorf("got unexpected response type %T while waiting for tracking data", r) } // according to https://github.com/mlichvar/chrony/blob/e11b518a1ffa704986fb1f1835c425844ba248ef/ntp.h#L70 var leapStatus string - switch tracking.LeapStatus { + switch resp.LeapStatus { case 0: leapStatus = "normal" case 1: @@ -144,24 +212,229 @@ func (c *Chrony) Gather(acc telegraf.Accumulator) error { tags := map[string]string{ "leap_status": leapStatus, - "reference_id": strings.ToUpper(strconv.FormatUint(uint64(tracking.RefID), 16)), - "stratum": strconv.FormatUint(uint64(tracking.Stratum), 10), + "reference_id": fbchrony.RefidAsHEX(resp.RefID), + "stratum": strconv.FormatUint(uint64(resp.Stratum), 10), } + if c.source != "" { + tags["source"] = c.source + } + fields := map[string]interface{}{ - "frequency": tracking.FreqPPM, - "system_time": tracking.CurrentCorrection, - "last_offset": tracking.LastOffset, - "residual_freq": tracking.ResidFreqPPM, - "rms_offset": tracking.RMSOffset, - "root_delay": tracking.RootDelay, - "root_dispersion": tracking.RootDispersion, - "skew": tracking.SkewPPM, - "update_interval": tracking.LastUpdateInterval, + "frequency": resp.FreqPPM, + "system_time": resp.CurrentCorrection, + "last_offset": resp.LastOffset, + "residual_freq": resp.ResidFreqPPM, + "rms_offset": resp.RMSOffset, + "root_delay": resp.RootDelay, + "root_dispersion": resp.RootDispersion, + "skew": resp.SkewPPM, + "update_interval": resp.LastUpdateInterval, } acc.AddFields("chrony", fields, tags) return nil } + +func (c *Chrony) gatherServerStats(acc telegraf.Accumulator) error { + req := fbchrony.NewServerStatsPacket() + r, err := c.client.Communicate(req) + if err != nil { + return fmt.Errorf("querying server statistics failed: %w", err) + } + + tags := map[string]string{} + if c.source != "" { + tags["source"] = c.source + } + + var fields map[string]interface{} + switch resp := r.(type) { + case *fbchrony.ReplyServerStats: + fields = map[string]interface{}{ + "ntp_hits": resp.NTPHits, + "ntp_drops": resp.NTPDrops, + "cmd_hits": resp.CMDHits, + "cmd_drops": resp.CMDDrops, + "log_drops": resp.LogDrops, + } + case *fbchrony.ReplyServerStats2: + fields = map[string]interface{}{ + "ntp_hits": resp.NTPHits, + "ntp_drops": resp.NTPDrops, + "ntp_auth_hits": resp.NTPAuthHits, + "cmd_hits": resp.CMDHits, + "cmd_drops": resp.CMDDrops, + "log_drops": resp.LogDrops, + "nke_hits": resp.NKEHits, + "nke_drops": resp.NKEDrops, + } + case *fbchrony.ReplyServerStats3: + fields = map[string]interface{}{ + "ntp_hits": resp.NTPHits, + "ntp_drops": resp.NTPDrops, + "ntp_auth_hits": resp.NTPAuthHits, + "ntp_interleaved_hits": resp.NTPInterleavedHits, + "ntp_timestamps": resp.NTPTimestamps, + "ntp_span_seconds": resp.NTPSpanSeconds, + "cmd_hits": resp.CMDHits, + "cmd_drops": resp.CMDDrops, + "log_drops": resp.LogDrops, + "nke_hits": resp.NKEHits, + "nke_drops": resp.NKEDrops, + } + default: + return fmt.Errorf("got unexpected response type %T while waiting for server statistics", r) + } + + acc.AddFields("chrony_serverstats", fields, tags) + + return nil +} + +func (c *Chrony) gatherSources(acc telegraf.Accumulator) error { + sourcesReq := fbchrony.NewSourcesPacket() + sourcesRaw, err := c.client.Communicate(sourcesReq) + if err != nil { + return fmt.Errorf("querying sources failed: %w", err) + } + + sourcesResp, ok := sourcesRaw.(*fbchrony.ReplySources) + if !ok { + return fmt.Errorf("got unexpected response type %T while waiting for sources", sourcesRaw) + } + + for idx := int32(0); int(idx) < sourcesResp.NSources; idx++ { + // Getting the source data + sourceDataReq := fbchrony.NewSourceDataPacket(idx) + sourceDataRaw, err := c.client.Communicate(sourceDataReq) + if err != nil { + return fmt.Errorf("querying data for source %d failed: %w", idx, err) + } + sourceData, ok := sourceDataRaw.(*fbchrony.ReplySourceData) + if !ok { + return fmt.Errorf("got unexpected response type %T while waiting for source data", sourceDataRaw) + } + + // Trying to resolve the source name + sourceNameReq := fbchrony.NewNTPSourceNamePacket(sourceData.IPAddr) + sourceNameRaw, err := c.client.Communicate(sourceNameReq) + if err != nil { + return fmt.Errorf("querying name of source %d failed: %w", idx, err) + } + sourceName, ok := sourceNameRaw.(*fbchrony.ReplyNTPSourceName) + if !ok { + return fmt.Errorf("got unexpected response type %T while waiting for source name", sourceNameRaw) + } + + // Cut the string at null termination + var peer string + if termidx := bytes.Index(sourceName.Name[:], []byte{0}); termidx >= 0 { + peer = string(sourceName.Name[:termidx]) + } else { + peer = string(sourceName.Name[:]) + } + + if peer == "" { + peer = sourceData.IPAddr.String() + } + + tags := map[string]string{ + "peer": peer, + } + if c.source != "" { + tags["source"] = c.source + } + + fields := map[string]interface{}{ + "index": idx, + "ip": sourceData.IPAddr.String(), + "poll": sourceData.Poll, + "stratum": sourceData.Stratum, + "state": sourceData.State.String(), + "mode": sourceData.Mode.String(), + "flags": sourceData.Flags, + "reachability": sourceData.Reachability, + "sample": sourceData.SinceSample, + "latest_measurement": sourceData.LatestMeas, + "latest_measurement_error": sourceData.LatestMeasErr, + } + acc.AddFields("chrony_sources", fields, tags) + } + return nil +} + +func (c *Chrony) gatherSourceStats(acc telegraf.Accumulator) error { + sourcesReq := fbchrony.NewSourcesPacket() + sourcesRaw, err := c.client.Communicate(sourcesReq) + if err != nil { + return fmt.Errorf("querying sources failed: %w", err) + } + + sourcesResp, ok := sourcesRaw.(*fbchrony.ReplySources) + if !ok { + return fmt.Errorf("got unexpected response type %T while waiting for sources", sourcesRaw) + } + + for idx := int32(0); int(idx) < sourcesResp.NSources; idx++ { + // Getting the source data + sourceStatsReq := fbchrony.NewSourceStatsPacket(idx) + sourceStatsRaw, err := c.client.Communicate(sourceStatsReq) + if err != nil { + return fmt.Errorf("querying data for source %d failed: %w", idx, err) + } + sourceStats, ok := sourceStatsRaw.(*fbchrony.ReplySourceStats) + if !ok { + return fmt.Errorf("got unexpected response type %T while waiting for source data", sourceStatsRaw) + } + + // Trying to resolve the source name + sourceNameReq := fbchrony.NewNTPSourceNamePacket(sourceStats.IPAddr) + sourceNameRaw, err := c.client.Communicate(sourceNameReq) + if err != nil { + return fmt.Errorf("querying name of source %d failed: %w", idx, err) + } + sourceName, ok := sourceNameRaw.(*fbchrony.ReplyNTPSourceName) + if !ok { + return fmt.Errorf("got unexpected response type %T while waiting for source name", sourceNameRaw) + } + + // Cut the string at null termination + var peer string + if termidx := bytes.Index(sourceName.Name[:], []byte{0}); termidx >= 0 { + peer = string(sourceName.Name[:termidx]) + } else { + peer = string(sourceName.Name[:]) + } + + if peer == "" { + peer = sourceStats.IPAddr.String() + } + + tags := map[string]string{ + "reference_id": fbchrony.RefidAsHEX(sourceStats.RefID), + "peer": peer, + } + if c.source != "" { + tags["source"] = c.source + } + + fields := map[string]interface{}{ + "index": idx, + "ip": sourceStats.IPAddr.String(), + "samples": sourceStats.NSamples, + "runs": sourceStats.NRuns, + "span_seconds": sourceStats.SpanSeconds, + "stddev": sourceStats.StandardDeviation, + "residual_frequency": sourceStats.ResidFreqPPM, + "skew": sourceStats.SkewPPM, + "offset": sourceStats.EstimatedOffset, + "offset_error": sourceStats.EstimatedOffsetErr, + } + acc.AddFields("chrony_sourcestats", fields, tags) + } + return nil +} + func init() { inputs.Add("chrony", func() telegraf.Input { return &Chrony{Timeout: config.Duration(3 * time.Second)} diff --git a/plugins/inputs/chrony/chrony_test.go b/plugins/inputs/chrony/chrony_test.go index ece6b68b2..efd0d0b0f 100644 --- a/plugins/inputs/chrony/chrony_test.go +++ b/plugins/inputs/chrony/chrony_test.go @@ -20,7 +20,65 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func TestGather(t *testing.T) { +func TestGatherActivity(t *testing.T) { + // Setup a mock server + server := Server{ + ActivityInfo: &fbchrony.Activity{ + Online: 34, + Offline: 6, + BurstOnline: 2, + BurstOffline: 0, + Unresolved: 5, + }, + } + addr, err := server.Listen(t) + require.NoError(t, err) + defer server.Shutdown() + + // Setup the plugin + plugin := &Chrony{ + Server: "udp://" + addr, + Metrics: []string{"activity"}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + // Start the plugin, do a gather and stop everything + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + server.Shutdown() + + // Do the comparison + expected := []telegraf.Metric{ + metric.New( + "chrony_activity", + map[string]string{"source": addr}, + map[string]interface{}{ + "online": 34, + "offline": 6, + "burst_online": 2, + "burst_offline": 0, + "unresolved": 5, + }, + time.Unix(0, 0), + ), + } + + options := []cmp.Option{ + // tests on linux with go1.20 will add a warning about code coverage, ignore that tag + testutil.IgnoreTags("warning"), + testutil.IgnoreTime(), + cmpopts.EquateApprox(0.001, 0), + } + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} + +func TestGatherTracking(t *testing.T) { // Setup a mock server server := Server{ TrackingInfo: &fbchrony.Tracking{ @@ -46,8 +104,9 @@ func TestGather(t *testing.T) { // Setup the plugin plugin := &Chrony{ - Server: "udp://" + addr, - Log: testutil.Logger{}, + Server: "udp://" + addr, + Metrics: []string{"tracking"}, + Log: testutil.Logger{}, } require.NoError(t, plugin.Init()) @@ -64,6 +123,7 @@ func TestGather(t *testing.T) { metric.New( "chrony", map[string]string{ + "source": addr, "reference_id": "A29FC87B", "leap_status": "not synchronized", "stratum": "3", @@ -94,6 +154,499 @@ func TestGather(t *testing.T) { testutil.RequireMetricsEqual(t, expected, actual, options...) } +func TestGatherServerStats(t *testing.T) { + // Setup a mock server + server := Server{ + ServerStatInfo: &fbchrony.ServerStats{ + NTPHits: 2542, + CMDHits: 112, + NTPDrops: 42, + CMDDrops: 8, + LogDrops: 0, + }, + } + addr, err := server.Listen(t) + require.NoError(t, err) + defer server.Shutdown() + + // Setup the plugin + plugin := &Chrony{ + Server: "udp://" + addr, + Metrics: []string{"serverstats"}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + // Start the plugin, do a gather and stop everything + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + server.Shutdown() + + // Do the comparison + expected := []telegraf.Metric{ + metric.New( + "chrony_serverstats", + map[string]string{"source": addr}, + map[string]interface{}{ + "ntp_hits": uint64(2542), + "ntp_drops": uint64(42), + "cmd_hits": uint64(112), + "cmd_drops": uint64(8), + "log_drops": uint64(0), + }, + time.Unix(0, 0), + ), + } + + options := []cmp.Option{ + // tests on linux with go1.20 will add a warning about code coverage, ignore that tag + testutil.IgnoreTags("warning"), + testutil.IgnoreTime(), + cmpopts.EquateApprox(0.001, 0), + } + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} + +func TestGatherServerStats2(t *testing.T) { + // Setup a mock server + server := Server{ + ServerStatInfo: &fbchrony.ServerStats2{ + NTPHits: 2542, + NKEHits: 5, + CMDHits: 112, + NTPDrops: 42, + NKEDrops: 1, + CMDDrops: 8, + LogDrops: 0, + NTPAuthHits: 9, + }, + } + addr, err := server.Listen(t) + require.NoError(t, err) + defer server.Shutdown() + + // Setup the plugin + plugin := &Chrony{ + Server: "udp://" + addr, + Metrics: []string{"serverstats"}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + // Start the plugin, do a gather and stop everything + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + server.Shutdown() + + // Do the comparison + expected := []telegraf.Metric{ + metric.New( + "chrony_serverstats", + map[string]string{"source": addr}, + map[string]interface{}{ + "ntp_hits": uint64(2542), + "ntp_drops": uint64(42), + "ntp_auth_hits": uint64(9), + "cmd_hits": uint64(112), + "cmd_drops": uint64(8), + "log_drops": uint64(0), + "nke_hits": uint64(5), + "nke_drops": uint64(1), + }, + time.Unix(0, 0), + ), + } + + options := []cmp.Option{ + // tests on linux with go1.20 will add a warning about code coverage, ignore that tag + testutil.IgnoreTags("warning"), + testutil.IgnoreTime(), + cmpopts.EquateApprox(0.001, 0), + } + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} + +func TestGatherServerStats3(t *testing.T) { + // Setup a mock server + server := Server{ + ServerStatInfo: &fbchrony.ServerStats3{ + NTPHits: 2542, + NKEHits: 5, + CMDHits: 112, + NTPDrops: 42, + NKEDrops: 1, + CMDDrops: 8, + LogDrops: 0, + NTPAuthHits: 9, + NTPInterleavedHits: 28, + NTPTimestamps: 69527, + NTPSpanSeconds: 33, + }, + } + addr, err := server.Listen(t) + require.NoError(t, err) + defer server.Shutdown() + + // Setup the plugin + plugin := &Chrony{ + Server: "udp://" + addr, + Metrics: []string{"serverstats"}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + // Start the plugin, do a gather and stop everything + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + server.Shutdown() + + // Do the comparison + expected := []telegraf.Metric{ + metric.New( + "chrony_serverstats", + map[string]string{"source": addr}, + map[string]interface{}{ + "ntp_hits": uint64(2542), + "ntp_drops": uint64(42), + "ntp_auth_hits": uint64(9), + "ntp_interleaved_hits": uint64(28), + "ntp_timestamps": uint64(69527), + "ntp_span_seconds": uint64(33), + "cmd_hits": uint64(112), + "cmd_drops": uint64(8), + "log_drops": uint64(0), + "nke_hits": uint64(5), + "nke_drops": uint64(1), + }, + time.Unix(0, 0), + ), + } + + options := []cmp.Option{ + // tests on linux with go1.20 will add a warning about code coverage, ignore that tag + testutil.IgnoreTags("warning"), + testutil.IgnoreTime(), + cmpopts.EquateApprox(0.001, 0), + } + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} + +func TestGatherSources(t *testing.T) { + // Setup a mock server + server := Server{ + SourcesInfo: []source{ + { + name: "ntp1.my.org", + data: &fbchrony.SourceData{ + IPAddr: net.IPv4(192, 168, 0, 1), + Poll: 64, + Stratum: 16, + State: fbchrony.SourceStateSync, + Mode: fbchrony.SourceModePeer, + Flags: 0, + Reachability: 0, + SinceSample: 0, + OrigLatestMeas: 1.22354, + LatestMeas: 1.22354, + LatestMeasErr: 0.00423, + }, + }, + { + name: "ntp2.my.org", + data: &fbchrony.SourceData{ + IPAddr: net.IPv4(192, 168, 0, 2), + Poll: 64, + Stratum: 16, + State: fbchrony.SourceStateSync, + Mode: fbchrony.SourceModePeer, + Flags: 0, + Reachability: 0, + SinceSample: 0, + OrigLatestMeas: 0.17791, + LatestMeas: 0.35445, + LatestMeasErr: 0.01196, + }, + }, + { + name: "ntp3.my.org", + data: &fbchrony.SourceData{ + IPAddr: net.IPv4(192, 168, 0, 3), + Poll: 512, + Stratum: 1, + State: fbchrony.SourceStateOutlier, + Mode: fbchrony.SourceModePeer, + Flags: 0, + Reachability: 512, + SinceSample: 377, + OrigLatestMeas: 7.21158, + LatestMeas: 7.21158, + LatestMeasErr: 2.15453, + }, + }, + }, + } + addr, err := server.Listen(t) + require.NoError(t, err) + defer server.Shutdown() + + // Setup the plugin + plugin := &Chrony{ + Server: "udp://" + addr, + Metrics: []string{"sources"}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + // Start the plugin, do a gather and stop everything + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + server.Shutdown() + + // Do the comparison + expected := []telegraf.Metric{ + metric.New( + "chrony_sources", + map[string]string{ + "source": addr, + "peer": "ntp1.my.org", + }, + map[string]interface{}{ + "index": 0, + "ip": "192.168.0.1", + "poll": 64, + "stratum": uint64(16), + "state": "sync", + "mode": "peer", + "flags": uint64(0), + "reachability": uint64(0), + "sample": uint64(0), + "latest_measurement": 1.22354, + "latest_measurement_error": 0.00423, + }, + time.Unix(0, 0), + ), + metric.New( + "chrony_sources", + map[string]string{ + "source": addr, + "peer": "ntp2.my.org", + }, + map[string]interface{}{ + "index": 1, + "ip": "192.168.0.2", + "poll": 64, + "stratum": uint64(16), + "state": "sync", + "mode": "peer", + "flags": uint64(0), + "reachability": uint64(0), + "sample": uint64(0), + "latest_measurement": 0.35445, + "latest_measurement_error": 0.01196, + }, + time.Unix(0, 0), + ), + metric.New( + "chrony_sources", + map[string]string{ + "source": addr, + "peer": "ntp3.my.org", + }, + map[string]interface{}{ + "index": 2, + "ip": "192.168.0.3", + "poll": 512, + "stratum": uint64(1), + "state": "outlier", + "mode": "peer", + "flags": uint64(0), + "reachability": uint64(512), + "sample": uint64(377), + "latest_measurement": 7.21158, + "latest_measurement_error": 2.15453, + }, + time.Unix(0, 0), + ), + } + + options := []cmp.Option{ + // tests on linux with go1.20 will add a warning about code coverage, ignore that tag + testutil.IgnoreTags("warning"), + testutil.IgnoreTime(), + cmpopts.EquateApprox(0.001, 0), + } + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} + +func TestGatherSourceStats(t *testing.T) { + // Setup a mock server + server := Server{ + SourcesInfo: []source{ + { + name: "ntp1.my.org", + stats: &fbchrony.SourceStats{ + RefID: 434354566, + IPAddr: net.IPv4(192, 168, 0, 1), + NSamples: 1254, + NRuns: 16, + SpanSeconds: 32, + StandardDeviation: 0.0244, + ResidFreqPPM: 0.0015, + SkewPPM: 0.0001, + EstimatedOffset: 0.0039, + EstimatedOffsetErr: 0.0007, + }, + }, + { + name: "ntp2.my.org", + stats: &fbchrony.SourceStats{ + RefID: 70349595, + IPAddr: net.IPv4(192, 168, 0, 2), + NSamples: 23135, + NRuns: 24, + SpanSeconds: 3, + StandardDeviation: 0.0099, + ResidFreqPPM: 0.0188, + SkewPPM: 0.0002, + EstimatedOffset: 0.0104, + EstimatedOffsetErr: 0.0021, + }, + }, + { + name: "ntp3.my.org", + stats: &fbchrony.SourceStats{ + RefID: 983490438, + IPAddr: net.IPv4(192, 168, 0, 3), + NSamples: 23, + NRuns: 4, + SpanSeconds: 193, + StandardDeviation: 7.0586, + ResidFreqPPM: 0.8320, + SkewPPM: 0.0332, + EstimatedOffset: 5.3345, + EstimatedOffsetErr: 1.5437, + }, + }, + }, + } + addr, err := server.Listen(t) + require.NoError(t, err) + defer server.Shutdown() + + // Setup the plugin + plugin := &Chrony{ + Server: "udp://" + addr, + Metrics: []string{"sourcestats"}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + // Start the plugin, do a gather and stop everything + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + server.Shutdown() + + // Do the comparison + expected := []telegraf.Metric{ + metric.New( + "chrony_sourcestats", + map[string]string{ + "source": addr, + "peer": "ntp1.my.org", + "reference_id": "19E3B986", + }, + map[string]interface{}{ + "index": 0, + "ip": "192.168.0.1", + "samples": uint64(1254), + "runs": uint64(16), + "span_seconds": uint64(32), + "stddev": 0.0244, + "residual_frequency": 0.0015, + "skew": 0.0001, + "offset": 0.0039, + "offset_error": 0.0007, + }, + time.Unix(0, 0), + ), + metric.New( + "chrony_sourcestats", + map[string]string{ + "source": addr, + "peer": "ntp2.my.org", + "reference_id": "0431731B", + }, + map[string]interface{}{ + "index": 1, + "ip": "192.168.0.2", + "samples": uint64(23135), + "runs": uint64(24), + "span_seconds": uint64(3), + "stddev": 0.0099, + "residual_frequency": 0.0188, + "skew": 0.0002, + "offset": 0.0104, + "offset_error": 0.0021, + }, + time.Unix(0, 0), + ), + metric.New( + "chrony_sourcestats", + map[string]string{ + "source": addr, + "peer": "ntp3.my.org", + "reference_id": "3A9EDF86", + }, + map[string]interface{}{ + "index": 2, + "ip": "192.168.0.3", + "samples": uint64(23), + "runs": uint64(4), + "span_seconds": uint64(193), + "stddev": 7.0586, + "residual_frequency": 0.8320, + "skew": 0.0332, + "offset": 5.3345, + "offset_error": 1.5437, + }, + time.Unix(0, 0), + ), + } + + options := []cmp.Option{ + // tests on linux with go1.20 will add a warning about code coverage, ignore that tag + testutil.IgnoreTags("warning"), + testutil.IgnoreTime(), + cmpopts.EquateApprox(0.001, 0), + } + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) +} + func TestIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -112,10 +665,11 @@ func TestIntegration(t *testing.T) { } require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() + addr := container.Address + ":" + container.Ports["323"] // Setup the plugin plugin := &Chrony{ - Server: "udp://" + container.Address + ":" + container.Ports["323"], + Server: "udp://" + addr, Log: testutil.Logger{}, } require.NoError(t, plugin.Init()) @@ -131,6 +685,7 @@ func TestIntegration(t *testing.T) { metric.New( "chrony", map[string]string{ + "source": addr, "leap_status": "normal", "reference_id": "A29FC87B", "stratum": "4", @@ -159,8 +714,17 @@ func TestIntegration(t *testing.T) { testutil.RequireMetricsStructureEqual(t, expected, actual, options...) } +type source struct { + name string + data *fbchrony.SourceData + stats *fbchrony.SourceStats +} + type Server struct { - TrackingInfo *fbchrony.Tracking + ActivityInfo *fbchrony.Activity + TrackingInfo *fbchrony.Tracking + ServerStatInfo interface{} + SourcesInfo []source conn net.PacketConn } @@ -203,7 +767,24 @@ func (s *Server) serve(t *testing.T) { } seqno := header.Sequence + 1 + t.Logf("mock server: received request %d", header.Command) switch header.Command { + case 14: // sources + _, err := s.conn.WriteTo(s.encodeSourcesReply(seqno), addr) + if err != nil { + t.Logf("mock server [sources]: writing reply failed: %v", err) + } else { + t.Log("mock server [sources]: successfully wrote reply") + } + case 15: // source data + var idx int32 + require.NoError(t, binary.Read(data, binary.BigEndian, &idx)) + _, err = s.conn.WriteTo(s.encodeSourceDataReply(seqno, idx), addr) + if err != nil { + t.Logf("mock server [source data]: writing reply failed: %v", err) + } else { + t.Log("mock server [source data]: successfully wrote reply") + } case 33: // tracking _, err := s.conn.WriteTo(s.encodeTrackingReply(seqno), addr) if err != nil { @@ -211,40 +792,67 @@ func (s *Server) serve(t *testing.T) { } else { t.Log("mock server [tracking]: successfully wrote reply") } + case 34: // source stats + var idx int32 + require.NoError(t, binary.Read(data, binary.BigEndian, &idx)) + _, err = s.conn.WriteTo(s.encodeSourceStatsReply(seqno, idx), addr) + if err != nil { + t.Logf("mock server [source stats]: writing reply failed: %v", err) + } else { + t.Log("mock server [source stats]: successfully wrote reply") + } + case 44: // activity + _, err := s.conn.WriteTo(s.encodeActivityReply(seqno), addr) + if err != nil { + t.Logf("mock server [activity]: writing reply failed: %v", err) + } else { + t.Log("mock server [activity]: successfully wrote reply") + } + case 54: // server stats + _, err := s.conn.WriteTo(s.encodeServerStatsReply(seqno), addr) + if err != nil { + t.Logf("mock server [serverstats]: writing reply failed: %v", err) + } else { + t.Log("mock server [serverstats]: successfully wrote reply") + } + case 65: // source name + buf := make([]byte, 20) + _, err := data.Read(buf) + require.NoError(t, err) + ip := decodeIP(buf) + t.Logf("mock server [source name]: resolving %v", ip) + _, err = s.conn.WriteTo(s.encodeSourceNameReply(seqno, ip), addr) + if err != nil { + t.Logf("mock server [source name]: writing reply failed: %v", err) + } else { + t.Log("mock server [source name]: successfully wrote reply") + } default: t.Logf("mock server: unhandled command %v", header.Command) } } } +func (s *Server) encodeActivityReply(sequence uint32) []byte { + // Encode the header + buf := encodeHeader(44, 12, 0, sequence) // activity request + + // Encode data + b := bytes.NewBuffer(buf) + _ = binary.Write(b, binary.BigEndian, s.ActivityInfo) + + return b.Bytes() +} + func (s *Server) encodeTrackingReply(sequence uint32) []byte { t := s.TrackingInfo // Encode the header - buf := []byte{ - 0x06, // version 6 - 0x02, // packet type 2: tracking - 0x00, // res1 - 0x00, // res2 - 0x00, 0x21, // command 33: tracking request - 0x00, 0x05, // reply 5: tracking reply - 0x00, 0x00, // status 0: success - 0x00, 0x00, // pad1 - 0x00, 0x00, // pad2 - 0x00, 0x00, // pad3 - } - buf = binary.BigEndian.AppendUint32(buf, sequence) // sequence number - buf = append(buf, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) // pad 4 & 5 + buf := encodeHeader(33, 5, 0, sequence) // tracking request // Encode data buf = binary.BigEndian.AppendUint32(buf, t.RefID) - buf = append(buf, t.IPAddr.To16()...) - if len(t.IPAddr) == 4 { - buf = append(buf, 0x00, 0x01) // IPv4 address family - } else { - buf = append(buf, 0x00, 0x02) // IPv6 address family - } - buf = append(buf, 0x00, 0x00) // padding + buf = append(buf, encodeIP(t.IPAddr)...) buf = binary.BigEndian.AppendUint16(buf, t.Stratum) buf = binary.BigEndian.AppendUint16(buf, t.LeapStatus) sec := uint64(t.RefTime.Unix()) @@ -265,6 +873,167 @@ func (s *Server) encodeTrackingReply(sequence uint32) []byte { return buf } +func (s *Server) encodeServerStatsReply(sequence uint32) []byte { + var b *bytes.Buffer + switch info := s.ServerStatInfo.(type) { + case *fbchrony.ServerStats: + // Encode the header + buf := encodeHeader(54, 14, 0, sequence) // activity request + + // Encode data + b = bytes.NewBuffer(buf) + _ = binary.Write(b, binary.BigEndian, info) + case *fbchrony.ServerStats2: + // Encode the header + buf := encodeHeader(54, 22, 0, sequence) // activity request + + // Encode data + b = bytes.NewBuffer(buf) + _ = binary.Write(b, binary.BigEndian, info) + case *fbchrony.ServerStats3: + // Encode the header + buf := encodeHeader(54, 24, 0, sequence) // activity request + + // Encode data + b = bytes.NewBuffer(buf) + _ = binary.Write(b, binary.BigEndian, info) + } + + return b.Bytes() +} + +func (s *Server) encodeSourcesReply(sequence uint32) []byte { + // Encode the header + buf := encodeHeader(14, 2, 0, sequence) // sources request + + // Encode data + buf = binary.BigEndian.AppendUint32(buf, uint32(len(s.SourcesInfo))) // NSources + + return buf +} + +func (s *Server) encodeSourceDataReply(sequence uint32, idx int32) []byte { + if len(s.SourcesInfo) <= int(idx) { + return encodeHeader(15, 3, 3, sequence) // status invalid + } + src := s.SourcesInfo[idx].data + + // Encode the header + buf := encodeHeader(15, 3, 0, sequence) // source data request + + // Encode data + buf = append(buf, encodeIP(src.IPAddr)...) + buf = binary.BigEndian.AppendUint16(buf, uint16(src.Poll)) + buf = binary.BigEndian.AppendUint16(buf, src.Stratum) + buf = binary.BigEndian.AppendUint16(buf, uint16(src.State)) + buf = binary.BigEndian.AppendUint16(buf, uint16(src.Mode)) + buf = binary.BigEndian.AppendUint16(buf, src.Flags) + buf = binary.BigEndian.AppendUint16(buf, src.Reachability) + buf = binary.BigEndian.AppendUint32(buf, src.SinceSample) + buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.OrigLatestMeas)) + buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.LatestMeas)) + buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.LatestMeasErr)) + + return buf +} + +func (s *Server) encodeSourceStatsReply(sequence uint32, idx int32) []byte { + if len(s.SourcesInfo) <= int(idx) { + return encodeHeader(34, 6, 3, sequence) // status invalid + } + src := s.SourcesInfo[idx].stats + + // Encode the header + buf := encodeHeader(15, 6, 0, sequence) // source data request + + // Encode data + buf = binary.BigEndian.AppendUint32(buf, src.RefID) + buf = append(buf, encodeIP(src.IPAddr)...) + buf = binary.BigEndian.AppendUint32(buf, src.NSamples) + buf = binary.BigEndian.AppendUint32(buf, src.NRuns) + buf = binary.BigEndian.AppendUint32(buf, src.SpanSeconds) + buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.StandardDeviation)) + buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.ResidFreqPPM)) + buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.SkewPPM)) + buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.EstimatedOffset)) + buf = binary.BigEndian.AppendUint32(buf, encodeFloat(src.EstimatedOffsetErr)) + + return buf +} + +func (s *Server) encodeSourceNameReply(sequence uint32, ip net.IP) []byte { + // Encode the header + buf := encodeHeader(65, 19, 0, sequence) // source name request + + // Find the correct source + var name []byte + for _, src := range s.SourcesInfo { + if src.data != nil && src.data.IPAddr.Equal(ip) || src.stats != nil && src.stats.IPAddr.Equal(ip) { + name = []byte(src.name) + break + } + } + + // Encode data + if len(name) > 256 { + buf = append(buf, name[:256]...) + } else { + buf = append(buf, name...) + buf = append(buf, make([]byte, 256-len(name))...) + } + + return buf +} + +func encodeHeader(command, replyType, status uint16, seqnr uint32) []byte { + buf := []byte{ + 0x06, // version 6 + 0x02, // packet type 2: reply + 0x00, // res1 + 0x00, // res2 + } + buf = binary.BigEndian.AppendUint16(buf, command) // command + buf = binary.BigEndian.AppendUint16(buf, replyType) // reply type + buf = binary.BigEndian.AppendUint16(buf, status) // status 0: success + buf = append(buf, []byte{ + 0x00, 0x00, // pad1 + 0x00, 0x00, // pad2 + 0x00, 0x00, // pad3 + }...) + buf = binary.BigEndian.AppendUint32(buf, seqnr) // sequence number + buf = append(buf, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) // pad 4 & 5 + + return buf +} + +func encodeIP(addr net.IP) []byte { + var buf []byte + + buf = append(buf, addr.To16()...) + if len(addr) == 4 { + buf = append(buf, 0x00, 0x01) // IPv4 address family + } else { + buf = append(buf, 0x00, 0x02) // IPv6 address family + } + buf = append(buf, 0x00, 0x00) // padding + + return buf +} + +func decodeIP(buf []byte) net.IP { + if len(buf) != 20 { + panic("invalid length for IP") + } + + addr := net.IP(buf[0:16]) + family := binary.BigEndian.Uint16(buf[16:18]) + if family == 1 { + return addr.To4() + } + + return addr +} + // Modified based on https://github.com/mlichvar/chrony/blob/master/util.c const ( floatExpBits = int32(7) diff --git a/plugins/inputs/chrony/sample.conf b/plugins/inputs/chrony/sample.conf index d9e1afb4b..22347a810 100644 --- a/plugins/inputs/chrony/sample.conf +++ b/plugins/inputs/chrony/sample.conf @@ -12,3 +12,12 @@ ## Try to resolve received addresses to host-names via DNS lookups ## Disabled by default to avoid DNS queries especially for slow DNS servers. # dns_lookup = false + + ## Metrics to query named according to chronyc commands + ## Available settings are: + ## activity -- number of peers online or offline + ## tracking -- information about system's clock performance + ## serverstats -- chronyd server statistics + ## sources -- extended information about peers + ## sourcestats -- statistics on peers + # metrics = ["tracking"]